Skip to content

HDFS-15423 RBF: WebHDFS create shouldn't choose DN from all sub-clusters #2903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -104,6 +105,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* WebHDFS Router implementation. This is an extension of
Expand Down Expand Up @@ -453,21 +456,33 @@ private DatanodeInfo chooseDatanode(final Router router,
final String path, final HttpOpParam.Op op, final long openOffset,
final String excludeDatanodes) throws IOException {
final RouterRpcServer rpcServer = getRPCServer(router);
DatanodeInfo[] dns = null;
DatanodeInfo[] dns = {};
String resolvedNs = "";
try {
dns = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
} catch (IOException e) {
LOG.error("Cannot get the datanodes from the RPC server", e);
}

if (op == PutOpParam.Op.CREATE) {
try {
resolvedNs = rpcServer.getCreateLocation(path).getNameserviceId();
} catch (IOException e) {
LOG.error("Cannot get the name service " +
"to create file for path {} ", path, e);
}
}

HashSet<Node> excludes = new HashSet<Node>();
if (excludeDatanodes != null) {
Collection<String> collection =
getTrimmedStringCollection(excludeDatanodes);
for (DatanodeInfo dn : dns) {
if (collection.contains(dn.getName())) {
excludes.add(dn);
}
Collection<String> collection =
getTrimmedStringCollection(excludeDatanodes);
for (DatanodeInfo dn : dns) {
String ns = getNsFromDataNodeNetworkLocation(dn.getNetworkLocation());
if (collection.contains(dn.getName())) {
excludes.add(dn);
} else if (op == PutOpParam.Op.CREATE && !ns.equals(resolvedNs)) {
// for CREATE, the dest dn should be in the resolved ns
excludes.add(dn);
}
}

Expand Down Expand Up @@ -502,6 +517,22 @@ private DatanodeInfo chooseDatanode(final Router router,
return getRandomDatanode(dns, excludes);
}

/**
* Get the nameservice info from datanode network location.
* @param location network location with format `/ns0/rack1`
* @return nameservice this datanode is in
*/
@VisibleForTesting
public static String getNsFromDataNodeNetworkLocation(String location) {
// network location should be in the format of /ns/rack
Pattern pattern = Pattern.compile("^/([^/]*)/");
Matcher matcher = pattern.matcher(location);
if (matcher.find()) {
return matcher.group(1);
}
return "";
}

/**
* Get a random Datanode from a subcluster.
* @param dns Nodes to be chosen from.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public static void createCluster(Configuration conf) throws IOException {
conf.addResource(CONTRACT_WEBHDFS_XML);

cluster = new MiniRouterDFSCluster(true, 2, conf);
cluster.setIndependentDNs();
cluster.setNumDatanodesPerNameservice(3);

// Start NNs and DNs and wait until ready
cluster.startCluster(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,14 +774,23 @@ public void startCluster(Configuration overrideConf) {
}
topology.setFederation(true);

// Generate conf for namenodes and datanodes
String ns0 = nameservices.get(0);
Configuration nnConf = generateNamenodeConfiguration(ns0);
if (overrideConf != null) {
nnConf.addResource(overrideConf);
// Router also uses this configurations as initial values.
routerConf = new Configuration(overrideConf);
}

// Set independent DNs across subclusters
int numDNs = nameservices.size() * numDatanodesPerNameservice;
Configuration[] dnConfs = null;
if (!sharedDNs) {
dnConfs = new Configuration[numDNs];
int dnId = 0;
for (String nsId : nameservices) {
Configuration subclusterConf = new Configuration();
Configuration subclusterConf = new Configuration(nnConf);
subclusterConf.set(DFS_INTERNAL_NAMESERVICES_KEY, nsId);
for (int i = 0; i < numDatanodesPerNameservice; i++) {
dnConfs[dnId] = subclusterConf;
Expand All @@ -791,14 +800,6 @@ public void startCluster(Configuration overrideConf) {
}

// Start mini DFS cluster
String ns0 = nameservices.get(0);
Configuration nnConf = generateNamenodeConfiguration(ns0);
if (overrideConf != null) {
nnConf.addResource(overrideConf);
// Router also uses this configurations as initial values.
routerConf = new Configuration(overrideConf);
}

cluster = new MiniDFSCluster.Builder(nnConf)
.numDataNodes(numDNs)
.nnTopology(topology)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createMountTableEntry;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import java.io.FileNotFoundException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Collections;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test suite for Router Web Hdfs methods.
*/
public class TestRouterWebHdfsMethods {
static final Logger LOG =
LoggerFactory.getLogger(TestRouterWebHdfsMethods.class);

private static StateStoreDFSCluster cluster;
private static RouterContext router;
private static String httpUri;

@BeforeClass
public static void globalSetUp() throws Exception {
cluster = new StateStoreDFSCluster(false, 2);
Configuration conf = new RouterConfigBuilder()
.stateStore()
.rpc()
.http()
.admin()
.build();
cluster.addRouterOverrides(conf);
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
router = cluster.getRandomRouter();
httpUri = "http://"+router.getHttpAddress();
}

@AfterClass
public static void tearDown() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}

@Test
public void testWebHdfsCreate() throws Exception {
// the file is created at default ns (ns0)
String path = "/tmp/file";
URL url = new URL(getUri(path));
LOG.info("URL: {}", url);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("PUT");
assertEquals(HttpURLConnection.HTTP_CREATED, conn.getResponseCode());
verifyFile("ns0", path, true);
verifyFile("ns1", path, false);
conn.disconnect();
}

@Test
public void testWebHdfsCreateWithMounts() throws Exception {
// the file is created at mounted ns (ns1)
String mountPoint = "/tmp-ns1";
String path = "/tmp-ns1/file";
createMountTableEntry(
router.getRouter(), mountPoint,
DestinationOrder.RANDOM, Collections.singletonList("ns1"));
URL url = new URL(getUri(path));
LOG.info("URL: {}", url);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("PUT");
assertEquals(HttpURLConnection.HTTP_CREATED, conn.getResponseCode());
verifyFile("ns1", path, true);
verifyFile("ns0", path, false);
conn.disconnect();
}

private String getUri(String path) {
final String user = System.getProperty("user.name");
final StringBuilder uri = new StringBuilder(httpUri);
uri.append("/webhdfs/v1").
append(path).
append("?op=CREATE").
append("&user.name=" + user);
return uri.toString();
}

private void verifyFile(String ns, String path, boolean shouldExist)
throws Exception {
FileSystem fs = cluster.getNamenode(ns, null).getFileSystem();
try {
fs.getFileStatus(new Path(path));
if (!shouldExist) {
fail(path + " should not exist in ns " + ns);
}
} catch (FileNotFoundException e) {
if (shouldExist) {
fail(path + " should exist in ns " + ns);
}
}
}

@Test
public void testGetNsFromDataNodeNetworkLocation() {
assertEquals("ns0", RouterWebHdfsMethods
.getNsFromDataNodeNetworkLocation("/ns0/rack-info1"));
assertEquals("ns0", RouterWebHdfsMethods
.getNsFromDataNodeNetworkLocation("/ns0/row1/rack-info1"));
assertEquals("", RouterWebHdfsMethods
.getNsFromDataNodeNetworkLocation("/row0"));
assertEquals("", RouterWebHdfsMethods
.getNsFromDataNodeNetworkLocation("whatever-rack-info1"));
}
}