Skip to content

Commit

Permalink
YARN-9860. Enable service mode for Docker containers on YARN
Browse files Browse the repository at this point in the history
           Contributed by Prabhu Joseph and Shane Kumpf
  • Loading branch information
macroadster committed Oct 10, 2019
1 parent 7a4b3d4 commit 31e0122
Show file tree
Hide file tree
Showing 14 changed files with 305 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.swagger.annotations.ApiModelProperty;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;

import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlEnum;
Expand Down Expand Up @@ -73,13 +74,15 @@ public String toString() {
private TypeEnum type = null;
private String destFile = null;
private String srcFile = null;
private LocalResourceVisibility visibility = null;
private Map<String, String> properties = new HashMap<>();

public ConfigFile copy() {
ConfigFile copy = new ConfigFile();
copy.setType(this.getType());
copy.setSrcFile(this.getSrcFile());
copy.setDestFile(this.getDestFile());
copy.setVisibility(this.visibility);
if (this.getProperties() != null && !this.getProperties().isEmpty()) {
copy.getProperties().putAll(this.getProperties());
}
Expand Down Expand Up @@ -150,6 +153,26 @@ public void setSrcFile(String srcFile) {
this.srcFile = srcFile;
}


/**
* Visibility of the Config file.
**/
public ConfigFile visibility(LocalResourceVisibility localrsrcVisibility) {
this.visibility = localrsrcVisibility;
return this;
}

@ApiModelProperty(example = "null", value = "Visibility of the Config file")
@JsonProperty("visibility")
public LocalResourceVisibility getVisibility() {
return visibility;
}

@XmlElement(name = "visibility", defaultValue="APPLICATION")
public void setVisibility(LocalResourceVisibility localrsrcVisibility) {
this.visibility = localrsrcVisibility;
}

/**
A blob of key value pairs that will be dumped in the dest_file in the format
as specified in type. If src_file is specified, src_file content are dumped
Expand Down Expand Up @@ -200,12 +223,13 @@ public boolean equals(java.lang.Object o) {
return Objects.equals(this.type, configFile.type)
&& Objects.equals(this.destFile, configFile.destFile)
&& Objects.equals(this.srcFile, configFile.srcFile)
&& Objects.equals(this.visibility, configFile.visibility)
&& Objects.equals(this.properties, configFile.properties);
}

@Override
public int hashCode() {
return Objects.hash(type, destFile, srcFile, properties);
return Objects.hash(type, destFile, srcFile, visibility, properties);
}

@Override
Expand All @@ -217,6 +241,8 @@ public String toString() {
.append(" destFile: ").append(toIndentedString(destFile))
.append("\n")
.append(" srcFile: ").append(toIndentedString(srcFile)).append("\n")
.append(" visibility: ").append(toIndentedString(visibility))
.append("\n")
.append(" properties: ").append(toIndentedString(properties))
.append("\n")
.append("}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,21 @@ public int actionDestroy(String serviceName) throws YarnException,
+ appDir);
ret = EXIT_NOT_FOUND;
}

// Delete Public Resource Dir
Path publicResourceDir = new Path(fs.getBasePath(), serviceName);
if (fileSystem.exists(publicResourceDir)) {
if (fileSystem.delete(publicResourceDir, true)) {
LOG.info("Successfully deleted public resource dir for "
+ serviceName + ": " + publicResourceDir);
} else {
String message = "Failed to delete public resource dir for service "
+ serviceName + " at: " + publicResourceDir;
LOG.info(message);
throw new YarnException(message);
}
}

try {
deleteZKNode(serviceName);
// don't set destroySucceed to false if no ZK node exists because not
Expand Down Expand Up @@ -1315,7 +1330,8 @@ private boolean addAMLog4jResource(String serviceName, Configuration conf,
new Path(remoteConfPath, YarnServiceConstants.YARN_SERVICE_LOG4J_FILENAME);
copy(conf, localFilePath, remoteFilePath);
LocalResource localResource =
fs.createAmResource(remoteConfPath, LocalResourceType.FILE);
fs.createAmResource(remoteConfPath, LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION);
localResources.put(localFilePath.getName(), localResource);
hasAMLog4j = true;
} else {
Expand Down Expand Up @@ -1465,7 +1481,7 @@ private void addKeytabResourceIfSecure(SliderFileSystem fileSystem,
return;
}
LocalResource keytabRes = fileSystem.createAmResource(keytabOnhdfs,
LocalResourceType.FILE);
LocalResourceType.FILE, LocalResourceVisibility.PRIVATE);
localResource.put(String.format(YarnServiceConstants.KEYTAB_LOCATION,
service.getName()), keytabRes);
LOG.info("Adding " + service.getName() + "'s keytab for "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public interface YarnServiceConstants {

String SERVICES_DIRECTORY = "services";

String SERVICES_PUBLIC_DIRECTORY = "/tmp/hadoop-yarn/staging/";

/**
* JVM property to define the service lib directory;
* this is set by the yarn.sh script
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.api.records.ConfigFormat;
Expand Down Expand Up @@ -191,6 +192,17 @@ public static Path initCompInstanceDir(SliderFileSystem fs,
return compInstanceDir;
}

public static Path initCompPublicResourceDir(SliderFileSystem fs,
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
ComponentInstance instance) {
Path compDir = fs.getComponentPublicResourceDir(
compLaunchContext.getServiceVersion(), compLaunchContext.getName());
Path compPublicResourceDir = new Path(compDir,
instance.getCompInstanceName());
return compPublicResourceDir;
}


// 1. Create all config files for a component on hdfs for localization
// 2. Add the config file to localResource
public static synchronized void createConfigFileAndAddLocalResource(
Expand All @@ -212,6 +224,20 @@ public static synchronized void createConfigFileAndAddLocalResource(
log.info("Component instance conf dir already exists: " + compInstanceDir);
}

Path compPublicResourceDir = initCompPublicResourceDir(fs,
compLaunchContext, instance);
if (!fs.getFileSystem().exists(compPublicResourceDir)) {
log.info("{} version {} : Creating Public Resource dir on hdfs: {}",
instance.getCompInstanceId(), compLaunchContext.getServiceVersion(),
compPublicResourceDir);
fs.getFileSystem().mkdirs(compPublicResourceDir,
new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE,
FsAction.EXECUTE));
} else {
log.info("Component instance public resource dir already exists: "
+ compPublicResourceDir);
}

log.debug("Tokens substitution for component instance: {}{}{}" + instance
.getCompInstanceName(), System.lineSeparator(), tokensForSubstitution);

Expand All @@ -236,7 +262,14 @@ public static synchronized void createConfigFileAndAddLocalResource(
* substitution and merges in new configs, and writes a new file to
* compInstanceDir/fileName.
*/
Path remoteFile = new Path(compInstanceDir, fileName);
Path remoteFile = null;
LocalResourceVisibility visibility = configFile.getVisibility();
if (visibility != null &&
visibility.equals(LocalResourceVisibility.PUBLIC)) {
remoteFile = new Path(compPublicResourceDir, fileName);
} else {
remoteFile = new Path(compInstanceDir, fileName);
}

if (!fs.getFileSystem().exists(remoteFile)) {
log.info("Saving config file on hdfs for component " + instance
Expand Down Expand Up @@ -268,7 +301,8 @@ public static synchronized void createConfigFileAndAddLocalResource(

// Add resource for localization
LocalResource configResource =
fs.createAmResource(remoteFile, LocalResourceType.FILE);
fs.createAmResource(remoteFile, LocalResourceType.FILE,
configFile.getVisibility());
Path destFile = new Path(configFile.getDestFile());
String symlink = APP_CONF_DIR + "/" + fileName;
addLocalResource(launcher, symlink, configResource, destFile,
Expand Down Expand Up @@ -311,7 +345,8 @@ public static synchronized void handleStaticFilesForLocalization(
LocalResource localResource = fs.createAmResource(sourceFile,
(staticFile.getType() == ConfigFile.TypeEnum.ARCHIVE ?
LocalResourceType.ARCHIVE :
LocalResourceType.FILE));
LocalResourceType.FILE), staticFile.getVisibility());

Path destFile = new Path(sourceFile.getName());
if (staticFile.getDestFile() != null && !staticFile.getDestFile()
.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
Expand All @@ -43,7 +44,8 @@ public void processArtifact(AbstractLauncher launcher,
}
log.info("Adding resource {}", artifact);
LocalResourceType type = LocalResourceType.ARCHIVE;
LocalResource packageResource = fileSystem.createAmResource(artifact, type);
LocalResource packageResource = fileSystem.createAmResource(artifact, type,
LocalResourceVisibility.APPLICATION);
launcher.addLocalResource(APP_LIB_DIR, packageResource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,19 @@ public Path getHomeDirectory() {
* @param resourceType resource type
* @return the local resource for AM
*/
public LocalResource createAmResource(Path destPath, LocalResourceType resourceType) throws IOException {
public LocalResource createAmResource(Path destPath,
LocalResourceType resourceType,
LocalResourceVisibility visibility) throws IOException {

FileStatus destStatus = fileSystem.getFileStatus(destPath);
LocalResource amResource = Records.newRecord(LocalResource.class);
amResource.setType(resourceType);
// Set visibility of the resource
// Setting to most private option
amResource.setVisibility(LocalResourceVisibility.APPLICATION);
if (visibility == null) {
visibility = LocalResourceVisibility.APPLICATION;
}
amResource.setVisibility(visibility);
// Set the resource to be copied over
amResource.setResource(
URL.fromPath(fileSystem.resolvePath(destStatus.getPath())));
Expand Down Expand Up @@ -419,7 +425,7 @@ public Map<String, LocalResource> submitDirectory(Path srcDir, String destRelati
for (FileStatus entry : fileset) {

LocalResource resource = createAmResource(entry.getPath(),
LocalResourceType.FILE);
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION);
String relativePath = destRelativeDir + "/" + entry.getPath().getName();
localResources.put(relativePath, resource);
}
Expand Down Expand Up @@ -465,7 +471,8 @@ public LocalResource submitFile(File localFile, Path tempPath, String subdir, St
// Set the type of resource - file or archive
// archives are untarred at destination
// we don't need the jar file to be untarred for now
return createAmResource(destPath, LocalResourceType.FILE);
return createAmResource(destPath, LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION);
}

/**
Expand All @@ -483,7 +490,7 @@ public void submitTarGzipAndUpdate(
BadClusterStateException {
Path dependencyLibTarGzip = getDependencyTarGzip();
LocalResource lc = createAmResource(dependencyLibTarGzip,
LocalResourceType.ARCHIVE);
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION);
providerResources.put(YarnServiceConstants.DEPENDENCY_LOCALIZED_DIR_LINK, lc);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -63,6 +64,26 @@ public Path getComponentDir(String serviceVersion, String compName) {
serviceVersion + "/" + compName);
}

public Path getBasePath() {
String tmpDir = configuration.get("hadoop.tmp.dir");
String basePath = YarnServiceConstants.SERVICE_BASE_DIRECTORY
+ "/" + YarnServiceConstants.SERVICES_DIRECTORY;
return new Path(tmpDir, basePath);
}

/**
* Returns the component public resource directory path.
*
* @param serviceVersion service version
* @param compName component name
* @return component public resource directory
*/
public Path getComponentPublicResourceDir(String serviceVersion,
String compName) {
return new Path(new Path(getBasePath(), getAppDir().getName() + "/"
+ "components"), serviceVersion + "/" + compName);
}

/**
* Deletes the component directory.
*
Expand All @@ -77,6 +98,12 @@ public void deleteComponentDir(String serviceVersion, String compName)
fileSystem.delete(path, true);
LOG.debug("deleted dir {}", path);
}
Path publicResourceDir = getComponentPublicResourceDir(serviceVersion,
compName);
if (fileSystem.exists(publicResourceDir)) {
fileSystem.delete(publicResourceDir, true);
LOG.debug("deleted public resource dir {}", publicResourceDir);
}
}

/**
Expand All @@ -92,6 +119,13 @@ public void deleteComponentsVersionDirIfEmpty(String serviceVersion)
fileSystem.delete(path, true);
LOG.info("deleted dir {}", path);
}
Path publicResourceDir = new Path(new Path(getBasePath(),
getAppDir().getName() + "/" + "components"), serviceVersion);
if (fileSystem.exists(publicResourceDir)
&& fileSystem.listStatus(publicResourceDir).length == 0) {
fileSystem.delete(publicResourceDir, true);
LOG.info("deleted public resource dir {}", publicResourceDir);
}
}


Expand Down
Loading

0 comments on commit 31e0122

Please sign in to comment.