Skip to content

Fixes for starting mulitple, and making ApplicationMasterServiceImpl … #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public interface YarnClientParameters {
*/
String getApplicationName();

/**
* The type of this YARN application. Defaults to "Kitten"
*/
String getApplicationType();

/**
* The queue the application master is assigned to run in.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.util.Map;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;

Expand Down Expand Up @@ -97,6 +99,11 @@ public Configuration getConfiguration() {
public String getApplicationName() {
return env.getString(LuaFields.APP_NAME);
}

@Override
public String getApplicationType() {
return env.isNil(LuaFields.APP_TYPE) ? "kitten" : env.getString(LuaFields.APP_TYPE);
}

@Override
public String getQueue() {
Expand Down Expand Up @@ -166,4 +173,6 @@ public long getClientTimeoutMillis() {
return env.isNil(LuaFields.TIMEOUT) ? -1 : env.getLong(LuaFields.TIMEOUT);
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,31 @@ public class YarnClientServiceImpl extends AbstractScheduledService
private final YarnClientParameters parameters;
private final MasterConnectionFactory<YarnClient> yarnClientFactory;
private final Stopwatch stopwatch;

private Credentials credentials;
private YarnClient yarnClient;
private ApplicationId applicationId;
private ApplicationReport finalReport;
private boolean timeout = false;

public YarnClientServiceImpl(YarnClientParameters params) {
this(params, new YarnClientFactory(params.getConfiguration()),
new Stopwatch());
new Stopwatch(), new Credentials());
}

public YarnClientServiceImpl(YarnClientParameters params, Credentials credentials) {
this(params, new YarnClientFactory(params.getConfiguration()),
new Stopwatch(), credentials);
}

public YarnClientServiceImpl(YarnClientParameters parameters,
MasterConnectionFactory<YarnClient> yarnClientFactory,
Stopwatch stopwatch) {
Stopwatch stopwatch, Credentials credentials) {
this.parameters = Preconditions.checkNotNull(parameters);
this.yarnClientFactory = yarnClientFactory;
this.stopwatch = stopwatch;
this.credentials = credentials;
if (this.credentials == null)
this.credentials = new Credentials();
}

@Override
Expand All @@ -89,7 +97,6 @@ protected void startUp() throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
Configuration conf = this.yarnClientFactory.getConfig();
FileSystem fs = FileSystem.get(conf);
Credentials credentials = new Credentials();
String tokenRenewer = this.yarnClientFactory.getConfig().get(YarnConfiguration.RM_PRINCIPAL);
if (tokenRenewer == null || tokenRenewer.length() == 0) {
throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer");
Expand All @@ -116,6 +123,8 @@ protected void startUp() throws IOException {
ApplicationSubmissionContext appContext = clientApp.getApplicationSubmissionContext();
this.applicationId = appContext.getApplicationId();
appContext.setApplicationName(parameters.getApplicationName());
if (parameters.getApplicationType() != null)
appContext.setApplicationType(parameters.getApplicationType());

// Setup the container for the application master.
ContainerLaunchParameters appMasterParams = parameters.getApplicationMasterParameters(applicationId);
Expand All @@ -140,7 +149,7 @@ public void run() {
stopwatch.start();
}

private void submitApplication(ApplicationSubmissionContext appContext) {
protected void submitApplication(ApplicationSubmissionContext appContext) {
LOG.info("Submitting application to the applications manager");
try {
yarnClient.submitApplication(appContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,6 @@ public void testKittenShell() throws Exception {
client.setConf(conf);
System.out.println("Running...");
assertEquals(0, client.run(new String[]{config, "distshell"}));
assertEquals(12, Files.readLines(tmpFile, Charsets.UTF_8).size());
assertEquals(14, Files.readLines(tmpFile, Charsets.UTF_8).size());
}
}
7 changes: 7 additions & 0 deletions java/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@
<artifactId>luaj-jse</artifactId>
<version>${luaj.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@
*/
package com.cloudera.kitten;

import java.nio.ByteBuffer;

import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.util.Records;

import java.nio.ByteBuffer;

/**
* Functions for constructing YARN objects from the parameter values.
*/
Expand All @@ -46,6 +45,10 @@ public ContainerLaunchContext create(ContainerLaunchParameters parameters) {
return clc;
}

public String getNodeLabelExpression(ContainerLaunchParameters parameters) {
return parameters.getNodeLabelsExpression();
}

public Resource createResource(ContainerLaunchParameters parameters) {
return parameters.getContainerResource(clusterMax);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,14 @@ public interface ContainerLaunchParameters {
* The commands to execute that start the application within the container.
*/
List<String> getCommands();

/**
* The nodeLabelsExpression that defines the types of nodes that can be allocated to the container.
* Defaults to null.
*/
String getNodeLabelsExpression();

/** The node name that is /required/ for this container. Defaults to null, which signifies no
* requirement */
String getNode();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
Expand Down Expand Up @@ -66,6 +65,7 @@ public LuaContainerLaunchParameters(LuaWrapper lv, Configuration conf,
this.extras = extras;
}


public int getCores() {
return lv.getInteger(LuaFields.CORES);
}
Expand All @@ -74,11 +74,20 @@ public int getMemory() {
return lv.getInteger(LuaFields.MEMORY);
}

@Override
public String getNodeLabelsExpression() {
return lv.isNil(LuaFields.NODE_LABELS) ? null : lv.getString(LuaFields.NODE_LABELS);
}

@Override
public Resource getContainerResource(Resource clusterMax) {
Resource rsrc = Records.newRecord(Resource.class);
rsrc.setMemory(Math.min(clusterMax.getMemory(), getMemory()));
rsrc.setVirtualCores(Math.min(clusterMax.getVirtualCores(), getCores()));
if (rsrc.getMemory() < getMemory())
LOG.warn("Memory reduced from "+getMemory()+" to cluster maximum " + rsrc.getMemory());
if (rsrc.getMemory() < getMemory())
LOG.warn("VCores reduced from "+getCores()+" to cluster maximum " + rsrc.getVirtualCores());
return rsrc;
}

Expand All @@ -92,6 +101,12 @@ public int getNumInstances() {
return lv.isNil(LuaFields.INSTANCES) ? 1 : lv.getInteger(LuaFields.INSTANCES);
}


@Override
public String getNode() {
return lv.isNil(LuaFields.NODE) ? null : lv.getString(LuaFields.NODE);
}

@Override
public Map<String, LocalResource> getLocalResources() {
Map<String, LocalResource> localResources = Maps.newHashMap();
Expand Down Expand Up @@ -123,12 +138,17 @@ public Map<String, LocalResource> getLocalResources() {
}

private LocalResource constructExtraResource(String key) {
LocalResource rsrc = Records.newRecord(LocalResource.class);
if (key.equals("job.xml") && ! localFileUris.containsKey(key))
return null;
LocalResource rsrc = Records.newRecord(LocalResource.class);
rsrc.setType(LocalResourceType.FILE);
rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
try {
Path path = new Path(localFileUris.get(key));
configureLocalResourceForPath(rsrc, path);
} catch (NullPointerException npe) {
LOG.warn("No local URI found for "+ key, npe);
return null;
} catch (IOException e) {
LOG.error("Error constructing extra local resource: " + key, e);
return null;
Expand Down Expand Up @@ -191,9 +211,10 @@ private NamedLocalResource constructResource(LuaPair lp) throws IOException {
private void configureLocalResourceForPath(LocalResource rsrc, Path path) throws IOException {
FileSystem fs = FileSystem.get(conf);
FileStatus stat = fs.getFileStatus(path);
Path p = path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
rsrc.setSize(stat.getLen());
rsrc.setTimestamp(stat.getModificationTime());
rsrc.setResource(ConverterUtils.getYarnUrlFromPath(path));
rsrc.setResource(ConverterUtils.getYarnUrlFromPath(p));
}

@Override
Expand Down Expand Up @@ -251,4 +272,6 @@ public String toCommand(LuaWrapper table) {
}
return sb.toString();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class LuaFields {
public static final String CONTAINERS = "containers";
public static final String CONTAINER = "container";
public static final String APP_NAME = "name";
public static final String APP_TYPE = "app_type";
public static final String TIMEOUT = "timeout";
public static final String USER = "user";
public static final String QUEUE = "queue";
Expand All @@ -37,7 +38,9 @@ public class LuaFields {
public static final String CORES = "cores";
public static final String MEMORY = "memory";
public static final String PRIORITY = "priority";

public static final String NODE_LABELS = "node_labels";
public static final String NODE = "node";

// For constructing commands from a LuaTable.
public static final String COMMAND_BASE = "base";
public static final String ARGS = "args";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.util.List;
import java.util.Map;

import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
import org.luaj.vm2.LoadState;
import org.luaj.vm2.LuaTable;
import org.luaj.vm2.LuaValue;
Expand Down Expand Up @@ -58,11 +60,19 @@ public LuaWrapper(String script, Map<String, Object> extras) {
env.set(e.getKey(), CoerceJavaToLua.coerce(e.getValue()));
}
InputStream luaCode = LocalDataHelper.getFileOrResource(script);
assert luaCode != null : "Lua script " + script + " not found";
LoadState.load(luaCode, script, env).call();
} catch (IOException e) {
LOG.error("Lua initialization error", e);
throw new RuntimeException(e);
}
} catch (NullPointerException e) {
LOG.error("Lua compiler error", e);
try{
List<String> lines = IOUtils.readLines( LocalDataHelper.getFileOrResource(script));
LOG.error(StringUtils.join("\n", lines));
} catch (IOException ioe) {}
throw new RuntimeException(e);
}
}

public LuaWrapper(LuaTable table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -125,6 +127,7 @@ private void copyToHdfs(String key, String localDataName) throws IOException {
Path src = new Path(localDataName);
Path dst = getPath(fs, src.getName());
InputStream data = getFileOrResource(localDataName);
assert data != null : "Could not access file/resource " + localDataName;
FSDataOutputStream os = fs.create(dst, true);
ByteStreams.copy(data, os);
os.close();
Expand Down
7 changes: 7 additions & 0 deletions java/master/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@ public interface ApplicationMasterService extends Service {
* monitoring.
*/
boolean hasRunningContainers();

int getTotalRequested();
int getTotalCompleted();
int getTotalFailures();
}
Loading