Skip to content

Commit

Permalink
YARN-11223. [Federation] Add getAppPriority, updateApplicationPriorit…
Browse files Browse the repository at this point in the history
…y REST APIs for Router. (#4733)
  • Loading branch information
slfan1989 authored Aug 15, 2022
1 parent d0fdb1d commit ab88e4b
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1261,14 +1261,52 @@ public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId)
@Override
public AppPriority getAppPriority(HttpServletRequest hsr, String appId)
throws AuthorizationException {
throw new NotImplementedException("Code is not implemented");

if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppPriority(hsr, appId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the getAppPriority appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppPriority Failed.", e);
}

return null;
}

@Override
public Response updateApplicationPriority(AppPriority targetPriority,
HttpServletRequest hsr, String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
throw new NotImplementedException("Code is not implemented");

if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}

if (targetPriority == null) {
throw new IllegalArgumentException("Parameter error, the targetPriority is empty or null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.updateApplicationPriority(targetPriority, hsr, appId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the updateApplicationPriority appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("updateApplicationPriority Failed.", e);
}

return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
Expand Down Expand Up @@ -579,4 +580,45 @@ public Response updateApplicationTimeout(AppTimeoutInfo appTimeout, HttpServletR

return Response.status(Status.OK).entity(result).build();
}

@Override
public Response updateApplicationPriority(AppPriority targetPriority, HttpServletRequest hsr,
String appId) throws YarnException, InterruptedException, IOException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

ApplicationId applicationId = ApplicationId.fromString(appId);
if (targetPriority == null) {
return Response.status(Status.BAD_REQUEST).build();
}

if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}

ApplicationReport appReport = applicationMap.get(applicationId);
Priority newPriority = Priority.newInstance(targetPriority.getPriority());
appReport.setPriority(newPriority);

return Response.status(Status.OK).entity(targetPriority).build();
}

@Override
public AppPriority getAppPriority(HttpServletRequest hsr, String appId)
throws AuthorizationException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

ApplicationId applicationId = ApplicationId.fromString(appId);

if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationReport appReport = applicationMap.get(applicationId);
Priority priority = appReport.getPriority();

return new AppPriority(priority.getPriority());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
Expand Down Expand Up @@ -855,4 +856,48 @@ public void testUpdateApplicationTimeout() throws IOException, InterruptedExcept
Assert.assertEquals(paramAppTimeOut.getTimeoutType(), entity.getTimeoutType());
Assert.assertEquals(paramAppTimeOut.getRemainingTimeInSec(), entity.getRemainingTimeInSec());
}

@Test
public void testUpdateApplicationPriority() throws IOException, InterruptedException,
YarnException {

// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
context.setPriority(20);

// Submit the application we are going to kill later
Assert.assertNotNull(interceptor.submitApplication(context, null));

int iPriority = 10;
// Set Priority for application
Response response = interceptor.updateApplicationPriority(
new AppPriority(iPriority), null, appId.toString());

Assert.assertNotNull(response);
AppPriority entity = (AppPriority) response.getEntity();
Assert.assertNotNull(entity);
Assert.assertEquals(iPriority, entity.getPriority());
}

@Test
public void testGetAppPriority() throws IOException, InterruptedException,
YarnException {

// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
int priority = 40;
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
context.setPriority(priority);

// Submit the application we are going to kill later
Assert.assertNotNull(interceptor.submitApplication(context, null));

// Set Priority for application
AppPriority appPriority = interceptor.getAppPriority(null, appId.toString());
Assert.assertNotNull(appPriority);
Assert.assertEquals(priority, appPriority.getPriority());
}
}

0 comments on commit ab88e4b

Please sign in to comment.