Skip to content

HADOOP-18036 : Backport HADOOP-17653 for branch-3.2 #3758

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: branch-3.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,18 @@ protected String getRootDir() {
if (this.rootDirectory == null) {
String dir = getConf().get(FEDERATION_STORE_FILE_DIRECTORY);
if (dir == null) {
File tempDir = Files.createTempDir();
File tempDirBase =
new File(System.getProperty("java.io.tmpdir"));
File tempDir = null;
try {
tempDir = java.nio.file.Files.createTempDirectory(
tempDirBase.toPath(), System.currentTimeMillis() + "-").toFile();
} catch (IOException e) {
// fallback to the base upon exception.
LOG.debug("Unable to create a temporary directory. Fall back to " +
" the default system temp directory {}", tempDirBase, e);
tempDir = tempDirBase;
}
dir = tempDir.getAbsolutePath();
LOG.warn("The root directory is not available, using {}", dir);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;

import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand All @@ -28,7 +26,9 @@
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't compile. We didn't relocate Guava classes in 3.2.x

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -44,6 +44,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
import java.util.Optional;
import java.util.Random;
Expand Down Expand Up @@ -74,7 +75,9 @@ public void setUp() throws IOException {

conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
"localhost:" + port);
tempDir = Files.createTempDir();
File testDir = GenericTestUtils.getTestDir();
tempDir = Files
.createTempDirectory(testDir.toPath(), "test").toFile();
File levelDBDir = new File(tempDir, BPID);
levelDBDir.mkdirs();
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl;

import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand All @@ -26,12 +25,14 @@
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.test.GenericTestUtils;
import org.iq80.leveldb.DBException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;

import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -60,7 +61,9 @@ public void setUp() throws IOException {

conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
"localhost:" + port);
tempDir = Files.createTempDir();
File testDir = GenericTestUtils.getTestDir();
tempDir = Files
.createTempDirectory(testDir.toPath(), "test").toFile();
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
tempDir.getAbsolutePath());
levelDBAliasMapServer.setConf(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -1016,6 +1018,131 @@ ApplicationId submitApp(Service app) throws IOException, YarnException {
return submissionContext.getApplicationId();
}

/**
* Compress (tar) the input files to the output file.
*
* @param files The files to compress
* @param output The resulting output file (should end in .tar.gz)
* @param bundleRoot
* @throws IOException
*/
public static File compressFiles(Collection<File> files, File output,
String bundleRoot) throws IOException {
try (FileOutputStream fos = new FileOutputStream(output);
TarArchiveOutputStream taos = new TarArchiveOutputStream(
new BufferedOutputStream(fos))) {
taos.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
for (File f : files) {
addFilesToCompression(taos, f, "sysfs", bundleRoot);
}
}
return output;
}

/**
* Compile file list for compression and going recursive for
* nested directories.
*
* @param taos The archive
* @param file The file to add to the archive
* @param dir The directory that should serve as
* the parent directory in the archive
* @throws IOException
*/
private static void addFilesToCompression(TarArchiveOutputStream taos,
File file, String dir, String bundleRoot) throws IOException {
if (!file.isHidden()) {
// Create an entry for the file
if (!dir.equals(".")) {
if (File.separator.equals("\\")) {
dir = dir.replaceAll("\\\\", "/");
}
}
taos.putArchiveEntry(
new TarArchiveEntry(file, dir + "/" + file.getName()));
if (file.isFile()) {
// Add the file to the archive
try (FileInputStream input = new FileInputStream(file)) {
IOUtils.copy(input, taos);
taos.closeArchiveEntry();
}
} else if (file.isDirectory()) {
// close the archive entry
if (!dir.equals(".")) {
taos.closeArchiveEntry();
}
// go through all the files in the directory and using recursion, add
// them to the archive
File[] allFiles = file.listFiles();
if (allFiles != null) {
for (File childFile : allFiles) {
addFilesToCompression(taos, childFile,
file.getPath().substring(bundleRoot.length()), bundleRoot);
}
}
}
}
}

private void addYarnSysFs(Path path,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i suspect the the change in the ServiceClient.java is totally unnecessary, because the method is not used at all.

Map<String, LocalResource> localResources, Service app)
throws IOException {
List<Component> componentsWithYarnSysFS = new ArrayList<Component>();
for(Component c : app.getComponents()) {
boolean enabled = Boolean.parseBoolean(c.getConfiguration()
.getEnv(ApplicationConstants.Environment
.YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE.name()));
if (enabled) {
componentsWithYarnSysFS.add(c);
}
}
if(componentsWithYarnSysFS.size() == 0) {
return;
}
String buffer = ServiceApiUtil.jsonSerDeser.toJson(app);
File testDir =
new File(System.getProperty("java.io.tmpdir"));
File tmpDir = Files.createTempDirectory(
testDir.toPath(), System.currentTimeMillis() + "-").toFile();
if (tmpDir.exists()) {
String serviceJsonPath = tmpDir.getAbsolutePath() + "/app.json";
File localFile = new File(serviceJsonPath);
if (localFile.createNewFile()) {
try (Writer writer = new OutputStreamWriter(
new FileOutputStream(localFile), StandardCharsets.UTF_8)) {
writer.write(buffer);
}
} else {
throw new IOException("Fail to write app.json to temp directory");
}
File destinationFile = new File(tmpDir.getAbsolutePath() + "/sysfs.tar");
if (!destinationFile.createNewFile()) {
throw new IOException("Fail to localize sysfs.tar.");
}
List<File> files = new ArrayList<File>();
files.add(localFile);
compressFiles(files, destinationFile, "sysfs");
LocalResource localResource =
fs.submitFile(destinationFile, path, ".", "sysfs.tar");
Path serviceJson = new Path(path, "sysfs.tar");
for (Component c : componentsWithYarnSysFS) {
ConfigFile e = new ConfigFile();
e.type(TypeEnum.ARCHIVE);
e.srcFile(serviceJson.toString());
e.destFile("/hadoop/yarn");
if (!c.getConfiguration().getFiles().contains(e)) {
c.getConfiguration().getFiles().add(e);
}
}
localResources.put("sysfs", localResource);
if (!tmpDir.delete()) {
LOG.warn("Failed to delete temp file: " + tmpDir.getAbsolutePath());
}
} else {
throw new IOException("Fail to localize sysfs resource.");
}
}

private void setLogAggregationContext(Service app, Configuration conf,
ApplicationSubmissionContext submissionContext) {
LogAggregationContext context = Records.newRecord(LogAggregationContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Map;
import java.util.TreeSet;

import org.apache.curator.shaded.com.google.common.base.Joiner;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't compile. the relocated guava classes do not exist in 3.2.x.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
Expand Down