Skip to content

YARN-11226. [Federation] Add createNewReservation, submitReservation, updateReservation, deleteReservation REST APIs for Router. #5175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Dec 22, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1062,4 +1062,93 @@ public void addOrUpdateApplicationHomeSubCluster(ApplicationId applicationId,
updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
}
}

/**
* Exists ReservationHomeSubCluster Mapping.
*
* @param reservationId reservationId
* @return true - exist, false - not exist
*/
public boolean existsReservationHomeSubCluster(ReservationId reservationId) {
try {
SubClusterId subClusterId = getReservationHomeSubCluster(reservationId);
if (subClusterId != null) {
return true;
}
} catch (YarnException e) {
LOG.warn("get homeSubCluster by reservationId = {} error.", reservationId, e);
}
return false;
}

/**
* Save Reservation And HomeSubCluster Mapping.
*
* @param reservationId reservationId
* @param homeSubCluster homeSubCluster
* @throws YarnException on failure
*/
public void addReservationHomeSubCluster(ReservationId reservationId,
ReservationHomeSubCluster homeSubCluster) throws YarnException {
try {
// persist the mapping of reservationId and the subClusterId which has
// been selected as its home
addReservationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
String msg = String.format(
"Unable to insert the ReservationId %s into the FederationStateStore.", reservationId);
throw new YarnException(msg, e);
}
}

/**
* Update Reservation And HomeSubCluster Mapping.
*
* @param subClusterId subClusterId
* @param reservationId reservationId
* @param homeSubCluster homeSubCluster
* @throws YarnException on failure
*/
public void updateReservationHomeSubCluster(SubClusterId subClusterId,
ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) throws YarnException {
try {
// update the mapping of reservationId and the home subClusterId to
// the new subClusterId we have selected
updateReservationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
SubClusterId subClusterIdInStateStore = getReservationHomeSubCluster(reservationId);
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Reservation {} already submitted on SubCluster {}.", reservationId, subClusterId);
} else {
String msg = String.format(
"Unable to update the ReservationId %s into the FederationStateStore.", reservationId);
throw new YarnException(msg, e);
}
}
}

/**
* Add or Update ReservationHomeSubCluster.
*
* @param reservationId reservationId.
* @param subClusterId homeSubClusterId, this is selected by strategy.
* @param retryCount number of retries.
* @throws YarnException yarn exception.
*/
public void addOrUpdateReservationHomeSubCluster(ReservationId reservationId,
SubClusterId subClusterId, int retryCount) throws YarnException {
Boolean exists = existsReservationHomeSubCluster(reservationId);
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
if (!exists || retryCount == 0) {
// persist the mapping of reservationId and the subClusterId which has
// been selected as its home.
addReservationHomeSubCluster(reservationId, reservationHomeSubCluster);
} else {
// update the mapping of reservationId and the home subClusterId to
// the new subClusterId we have selected.
updateReservationHomeSubCluster(subClusterId, reservationId,
reservationHomeSubCluster);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
Expand Down Expand Up @@ -57,6 +67,8 @@ public final class RouterServerUtil {

private static final String EPOCH_PREFIX = "e";

private static final String RESERVEIDSTR_PREFIX = "reservation_";

/** Disable constructor. */
private RouterServerUtil() {
}
Expand Down Expand Up @@ -494,6 +506,15 @@ public static String getRenewerForToken(Token<RMDelegationTokenIdentifier> token
? token.decodeIdentifier().getRenewer().toString() : user.getShortUserName();
}

/**
* Set User information.
*
* If the username is empty, we will use the Yarn Router user directly.
* Do not create a proxy user if userName matches the userName on current UGI.
*
* @param userName userName.
* @return UserGroupInformation.
*/
public static UserGroupInformation setupUser(final String userName) {
UserGroupInformation user = null;
try {
Expand All @@ -513,7 +534,94 @@ public static UserGroupInformation setupUser(final String userName) {
return user;
} catch (IOException e) {
throw RouterServerUtil.logAndReturnYarnRunTimeException(e,
"Error while creating Router RMAdmin Service for user : %s.", user);
"Error while creating Router Service for user : %s.", user);
}
}

/**
* Check reservationId is accurate.
*
* We need to ensure that reservationId cannot be empty and
* can be converted to ReservationId object normally.
*
* @param reservationId reservationId.
* @throws IllegalArgumentException If the format of the reservationId is not accurate,
* an IllegalArgumentException needs to be thrown.
*/
@Public
@Unstable
public static void validateReservationId(String reservationId) throws IllegalArgumentException {

if (reservationId == null || reservationId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the reservationId is empty or null.");
}

if (!reservationId.startsWith(RESERVEIDSTR_PREFIX)) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}

String[] resFields = reservationId.split("_");
if (resFields.length != 3) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}

String clusterTimestamp = resFields[1];
String id = resFields[2];
if (!NumberUtils.isDigits(id) || !NumberUtils.isDigits(clusterTimestamp)) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}
}

/**
* Convert ReservationDefinitionInfo to ReservationDefinition.
*
* @param definitionInfo ReservationDefinitionInfo Object.
* @return ReservationDefinition.
*/
public static ReservationDefinition convertReservationDefinition(
ReservationDefinitionInfo definitionInfo) {
if (definitionInfo == null || definitionInfo.getReservationRequests() == null
|| definitionInfo.getReservationRequests().getReservationRequest() == null
|| definitionInfo.getReservationRequests().getReservationRequest().isEmpty()) {
throw new RuntimeException("definitionInfo Or ReservationRequests is Null.");
}

// basic variable
long arrival = definitionInfo.getArrival();
long deadline = definitionInfo.getDeadline();

// ReservationRequests reservationRequests
String name = definitionInfo.getReservationName();
String recurrenceExpression = definitionInfo.getRecurrenceExpression();
Priority priority = Priority.newInstance(definitionInfo.getPriority());

// reservation requests info
List<ReservationRequest> reservationRequestList = new ArrayList<>();

ReservationRequestsInfo reservationRequestsInfo = definitionInfo.getReservationRequests();

List<ReservationRequestInfo> reservationRequestInfos =
reservationRequestsInfo.getReservationRequest();

for (ReservationRequestInfo resRequestInfo : reservationRequestInfos) {
ResourceInfo resourceInfo = resRequestInfo.getCapability();
Resource capability =
Resource.newInstance(resourceInfo.getMemorySize(), resourceInfo.getvCores());
ReservationRequest reservationRequest = ReservationRequest.newInstance(capability,
resRequestInfo.getNumContainers(), resRequestInfo.getMinConcurrency(),
resRequestInfo.getDuration());
reservationRequestList.add(reservationRequest);
}

ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values();
ReservationRequestInterpreter reservationRequestInterpreter =
values[reservationRequestsInfo.getReservationRequestsInterpreter()];
ReservationRequests reservationRequests = ReservationRequests.newInstance(
reservationRequestList, reservationRequestInterpreter);

ReservationDefinition definition = ReservationDefinition.newInstance(
arrival, deadline, reservationRequests, name, recurrenceExpression, priority);

return definition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;

import java.io.IOException;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;

/**
* Extends the RequestInterceptor class and provides common functionality which
Expand Down Expand Up @@ -68,7 +66,7 @@ public Configuration getConf() {
*/
@Override
public void init(String userName) {
setupUser(userName);
this.user = RouterServerUtil.setupUser(userName);
if (this.nextInterceptor != null) {
this.nextInterceptor.init(userName);
}
Expand All @@ -92,34 +90,6 @@ public RESTRequestInterceptor getNextInterceptor() {
return this.nextInterceptor;
}

/**
* Set User information.
*
* If the username is empty, we will use the Yarn Router user directly.
* Do not create a proxy user if user name matches the user name on current UGI.
* @param userName userName.
*/
private void setupUser(final String userName) {
try {
if (userName == null || userName.isEmpty()) {
user = UserGroupInformation.getCurrentUser();
} else if (UserGroupInformation.isSecurityEnabled()) {
user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
} else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,
UserGroupInformation.getCurrentUser());
}
} catch (IOException e) {
String message = "Error while creating Router RMAdmin Service for user:";
if (user != null) {
message += ", user: " + user;
}
throw new YarnRuntimeException(message, e);
}
}

public UserGroupInformation getUser() {
return user;
}
Expand Down
Loading