-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-18036 : Backport HADOOP-17653 for branch-3.2 #3758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: branch-3.2
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -103,6 +103,8 @@ | |
import java.net.URI; | ||
import java.net.URISyntaxException; | ||
import java.nio.ByteBuffer; | ||
import java.nio.charset.StandardCharsets; | ||
import java.nio.file.Files; | ||
import java.text.MessageFormat; | ||
import java.util.*; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
@@ -1016,6 +1018,131 @@ ApplicationId submitApp(Service app) throws IOException, YarnException { | |
return submissionContext.getApplicationId(); | ||
} | ||
|
||
/** | ||
* Compress (tar) the input files to the output file. | ||
* | ||
* @param files The files to compress | ||
* @param output The resulting output file (should end in .tar.gz) | ||
* @param bundleRoot | ||
* @throws IOException | ||
*/ | ||
public static File compressFiles(Collection<File> files, File output, | ||
String bundleRoot) throws IOException { | ||
try (FileOutputStream fos = new FileOutputStream(output); | ||
TarArchiveOutputStream taos = new TarArchiveOutputStream( | ||
new BufferedOutputStream(fos))) { | ||
taos.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU); | ||
for (File f : files) { | ||
addFilesToCompression(taos, f, "sysfs", bundleRoot); | ||
} | ||
} | ||
return output; | ||
} | ||
|
||
/** | ||
* Compile file list for compression and going recursive for | ||
* nested directories. | ||
* | ||
* @param taos The archive | ||
* @param file The file to add to the archive | ||
* @param dir The directory that should serve as | ||
* the parent directory in the archive | ||
* @throws IOException | ||
*/ | ||
private static void addFilesToCompression(TarArchiveOutputStream taos, | ||
File file, String dir, String bundleRoot) throws IOException { | ||
if (!file.isHidden()) { | ||
// Create an entry for the file | ||
if (!dir.equals(".")) { | ||
if (File.separator.equals("\\")) { | ||
dir = dir.replaceAll("\\\\", "/"); | ||
} | ||
} | ||
taos.putArchiveEntry( | ||
new TarArchiveEntry(file, dir + "/" + file.getName())); | ||
if (file.isFile()) { | ||
// Add the file to the archive | ||
try (FileInputStream input = new FileInputStream(file)) { | ||
IOUtils.copy(input, taos); | ||
taos.closeArchiveEntry(); | ||
} | ||
} else if (file.isDirectory()) { | ||
// close the archive entry | ||
if (!dir.equals(".")) { | ||
taos.closeArchiveEntry(); | ||
} | ||
// go through all the files in the directory and using recursion, add | ||
// them to the archive | ||
File[] allFiles = file.listFiles(); | ||
if (allFiles != null) { | ||
for (File childFile : allFiles) { | ||
addFilesToCompression(taos, childFile, | ||
file.getPath().substring(bundleRoot.length()), bundleRoot); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
private void addYarnSysFs(Path path, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i suspect the the change in the ServiceClient.java is totally unnecessary, because the method is not used at all. |
||
Map<String, LocalResource> localResources, Service app) | ||
throws IOException { | ||
List<Component> componentsWithYarnSysFS = new ArrayList<Component>(); | ||
for(Component c : app.getComponents()) { | ||
boolean enabled = Boolean.parseBoolean(c.getConfiguration() | ||
.getEnv(ApplicationConstants.Environment | ||
.YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE.name())); | ||
if (enabled) { | ||
componentsWithYarnSysFS.add(c); | ||
} | ||
} | ||
if(componentsWithYarnSysFS.size() == 0) { | ||
return; | ||
} | ||
String buffer = ServiceApiUtil.jsonSerDeser.toJson(app); | ||
File testDir = | ||
new File(System.getProperty("java.io.tmpdir")); | ||
File tmpDir = Files.createTempDirectory( | ||
testDir.toPath(), System.currentTimeMillis() + "-").toFile(); | ||
if (tmpDir.exists()) { | ||
String serviceJsonPath = tmpDir.getAbsolutePath() + "/app.json"; | ||
File localFile = new File(serviceJsonPath); | ||
if (localFile.createNewFile()) { | ||
try (Writer writer = new OutputStreamWriter( | ||
new FileOutputStream(localFile), StandardCharsets.UTF_8)) { | ||
writer.write(buffer); | ||
} | ||
} else { | ||
throw new IOException("Fail to write app.json to temp directory"); | ||
} | ||
File destinationFile = new File(tmpDir.getAbsolutePath() + "/sysfs.tar"); | ||
if (!destinationFile.createNewFile()) { | ||
throw new IOException("Fail to localize sysfs.tar."); | ||
} | ||
List<File> files = new ArrayList<File>(); | ||
files.add(localFile); | ||
compressFiles(files, destinationFile, "sysfs"); | ||
LocalResource localResource = | ||
fs.submitFile(destinationFile, path, ".", "sysfs.tar"); | ||
Path serviceJson = new Path(path, "sysfs.tar"); | ||
for (Component c : componentsWithYarnSysFS) { | ||
ConfigFile e = new ConfigFile(); | ||
e.type(TypeEnum.ARCHIVE); | ||
e.srcFile(serviceJson.toString()); | ||
e.destFile("/hadoop/yarn"); | ||
if (!c.getConfiguration().getFiles().contains(e)) { | ||
c.getConfiguration().getFiles().add(e); | ||
} | ||
} | ||
localResources.put("sysfs", localResource); | ||
if (!tmpDir.delete()) { | ||
LOG.warn("Failed to delete temp file: " + tmpDir.getAbsolutePath()); | ||
} | ||
} else { | ||
throw new IOException("Fail to localize sysfs resource."); | ||
} | ||
} | ||
|
||
private void setLogAggregationContext(Service app, Configuration conf, | ||
ApplicationSubmissionContext submissionContext) { | ||
LogAggregationContext context = Records.newRecord(LogAggregationContext | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,7 @@ | |
import java.util.Map; | ||
import java.util.TreeSet; | ||
|
||
import org.apache.curator.shaded.com.google.common.base.Joiner; | ||
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doesn't compile. the relocated guava classes do not exist in 3.2.x. |
||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.yarn.api.records.Resource; | ||
import org.apache.hadoop.yarn.api.records.ResourceInformation; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't compile. We didn't relocate Guava classes in 3.2.x