Skip to content

YARN-11183. Federation: Remove outdated ApplicationHomeSubCluster in … #4450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,11 @@ BEGIN
WHERE applicationId = applicationID_IN;
END //

CREATE PROCEDURE sp_getApplicationsHomeSubCluster()
CREATE PROCEDURE sp_getApplicationsHomeSubCluster(IN homeSubCluster_IN varchar(256))
BEGIN
SELECT applicationId, homeSubCluster
FROM applicationsHomeSubCluster;
FROM applicationsHomeSubCluster
WHERE homeSubCluster_IN='' or homeSubCluster=homeSubCluster_IN;
END //

CREATE PROCEDURE sp_deleteApplicationHomeSubCluster(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,14 @@ IF OBJECT_ID ( '[sp_getApplicationsHomeSubCluster]', 'P' ) IS NOT NULL
GO

CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster]
@homeSubCluster VARCHAR(256)
AS BEGIN
DECLARE @errorMessage nvarchar(4000)

BEGIN TRY
SELECT [applicationId], [homeSubCluster], [createTime]
FROM [dbo].[applicationsHomeSubCluster]
WHERE @homeSubCluster = '' or [homeSubCluster] = @homeSubCluster
END TRY

BEGIN CATCH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,8 @@ protected boolean isEventThreadWaiting() {
return eventHandlingThread.getState() == Thread.State.WAITING;
}

protected boolean isDrained() {
@VisibleForTesting
public boolean isDrained() {
return drained;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void handle(Event event) {
}

@Override
protected boolean isDrained() {
public boolean isDrained() {
synchronized (mutex) {
return drained;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,12 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException {
List<ApplicationHomeSubCluster> result =
new ArrayList<ApplicationHomeSubCluster>();
SubClusterId subClusterId = request.getSubClusterId();
for (Entry<ApplicationId, SubClusterId> e : applications.entrySet()) {
result
.add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue()));
if (subClusterId == null || subClusterId.equals(e.getValue())) {
result.add(
ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can e.getValue() be null?

}
}

GetApplicationsHomeSubClusterResponse.newInstance(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public class SQLFederationStateStore implements FederationStateStore {
"{call sp_getApplicationHomeSubCluster(?, ?)}";

private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER =
"{call sp_getApplicationsHomeSubCluster()}";
"{call sp_getApplicationsHomeSubCluster(?)}";

private static final String CALL_SP_SET_POLICY_CONFIGURATION =
"{call sp_setPolicyConfiguration(?, ?, ?, ?)}";
Expand Down Expand Up @@ -725,6 +725,10 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
try {
cstmt = getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);

// Set the parameters for the stored procedure
SubClusterId subClusterId = request.getSubClusterId();
cstmt.setString(1, subClusterId == null ? "" : subClusterId.getId());

// Execute the query
long startTime = clock.getTime();
rs = cstmt.executeQuery();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,17 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException {
List<ApplicationHomeSubCluster> result = new ArrayList<>();
SubClusterId subClusterId = request.getSubClusterId();

try {
for (String child : zkManager.getChildren(appsZNode)) {
ApplicationId appId = ApplicationId.fromString(child);
SubClusterId homeSubCluster = getApp(appId);
ApplicationHomeSubCluster app =
ApplicationHomeSubCluster.newInstance(appId, homeSubCluster);
result.add(app);
if (subClusterId == null || subClusterId.equals(homeSubCluster)) {
ApplicationHomeSubCluster app =
ApplicationHomeSubCluster.newInstance(appId, homeSubCluster);
result.add(app);
}
}
} catch (Exception e) {
String errMsg = "Cannot get apps: " + e.getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.store.records;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;

Expand All @@ -37,4 +38,21 @@ public static GetApplicationsHomeSubClusterRequest newInstance() {
return request;
}

@Private
@Unstable
public static GetApplicationsHomeSubClusterRequest newInstance(
SubClusterId subClusterId) {
GetApplicationsHomeSubClusterRequest request =
Records.newRecord(GetApplicationsHomeSubClusterRequest.class);
request.setSubClusterId(subClusterId);
return request;
}

@Public
@Unstable
public abstract SubClusterId getSubClusterId();

@Public
@Unstable
public abstract void setSubClusterId(SubClusterId subClusterId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ public String toString() {
@Override
public ApplicationId getApplicationId() {
ApplicationHomeSubClusterProtoOrBuilder p = viaProto ? proto : builder;
if (this.applicationId != null) {
return this.applicationId;
}
if (!p.hasApplicationId()) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsHomeSubClusterRequestProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsHomeSubClusterRequestProtoOrBuilder;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;

import org.apache.hadoop.thirdparty.protobuf.TextFormat;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;

/**
* Protocol buffer based implementation of
Expand All @@ -37,6 +40,7 @@ public class GetApplicationsHomeSubClusterRequestPBImpl
GetApplicationsHomeSubClusterRequestProto.getDefaultInstance();
private GetApplicationsHomeSubClusterRequestProto.Builder builder = null;
private boolean viaProto = false;
private SubClusterId subClusterId = null;

public GetApplicationsHomeSubClusterRequestPBImpl() {
builder = GetApplicationsHomeSubClusterRequestProto.newBuilder();
Expand All @@ -49,11 +53,34 @@ public GetApplicationsHomeSubClusterRequestPBImpl(
}

public GetApplicationsHomeSubClusterRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}

private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}

private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = GetApplicationsHomeSubClusterRequestProto.newBuilder(proto);
}
viaProto = false;
}

private void mergeLocalToBuilder() {
if (this.subClusterId != null) {
builder.setSubClusterId(convertToProtoFormat(this.subClusterId));
}
}

@Override
public int hashCode() {
return getProto().hashCode();
Expand All @@ -75,4 +102,38 @@ public String toString() {
return TextFormat.shortDebugString(getProto());
}

@Override
public SubClusterId getSubClusterId() {
GetApplicationsHomeSubClusterRequestProtoOrBuilder p =
viaProto ? proto : builder;
if (subClusterId != null) {
return subClusterId;
}
if (!p.hasSubClusterId()) {
return null;
}
this.subClusterId = convertFromProtoFormat(p.getSubClusterId());

return this.subClusterId;
}

@Override
public void setSubClusterId(SubClusterId subClusterId) {
maybeInitBuilder();
if (subClusterId == null) {
builder.clearSubClusterId();
return;
}
this.subClusterId = subClusterId;
}

private SubClusterId convertFromProtoFormat(
SubClusterIdProto subClusterIdProto) {
return new SubClusterIdPBImpl(subClusterIdProto);
}

private SubClusterIdProto convertToProtoFormat(SubClusterId appId) {
return ((SubClusterIdPBImpl) appId).getProto();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ message GetApplicationHomeSubClusterResponseProto {
}

message GetApplicationsHomeSubClusterRequestProto {

optional SubClusterIdProto sub_cluster_id = 1;
}

message GetApplicationsHomeSubClusterResponseProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,18 @@ public void testGetApplicationsHomeSubCluster() throws Exception {
Assert.assertEquals(2, result.getAppsHomeSubClusters().size());
Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc1));
Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc2));

getRequest = GetApplicationsHomeSubClusterRequest
.newInstance(subClusterId1);
result = stateStore.getApplicationsHomeSubCluster(getRequest);
Assert.assertEquals(1, result.getAppsHomeSubClusters().size());
Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc1));

getRequest = GetApplicationsHomeSubClusterRequest
.newInstance(subClusterId2);
result = stateStore.getApplicationsHomeSubCluster(getRequest);
Assert.assertEquals(1, result.getAppsHomeSubClusters().size());
Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc2));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,14 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
+ " WHERE applicationId = applicationID_IN; END";

private static final String SP_GETAPPLICATIONSHOMESUBCLUSTER =
"CREATE PROCEDURE sp_getApplicationsHomeSubCluster()"
"CREATE PROCEDURE sp_getApplicationsHomeSubCluster("
+ " IN homeSubCluster_IN varchar(256))"
+ " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ " DECLARE result CURSOR FOR"
+ " SELECT applicationId, homeSubCluster"
+ " FROM applicationsHomeSubCluster; OPEN result; END";
+ " FROM applicationsHomeSubCluster"
+ " WHERE homeSubCluster_IN='' or homeSubCluster=homeSubCluster_IN;"
+ " OPEN result; END";

private static final String SP_DELETEAPPLICATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_deleteApplicationHomeSubCluster("
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
Expand Down Expand Up @@ -90,6 +91,7 @@ public class RMActiveServiceContext {
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null;
private FederationStateStoreService fedStateStoreService = null;
private ContainerAllocationExpirer containerAllocationExpirer;
private DelegationTokenRenewer delegationTokenRenewer;
private AMRMTokenSecretManager amRMTokenSecretManager;
Expand Down Expand Up @@ -170,6 +172,13 @@ public void setStateStore(RMStateStore store) {
stateStore = store;
}

@Private
@Unstable
public void setFederationStateStoreService(
FederationStateStoreService federationStateStoreService) {
fedStateStoreService = federationStateStoreService;
}

@Private
@Unstable
public ClientRMService getClientRMService() {
Expand All @@ -194,6 +203,12 @@ public RMStateStore getStateStore() {
return stateStore;
}

@Private
@Unstable
public FederationStateStoreService getFederationStateStoreService() {
return fedStateStoreService;
}

@Private
@Unstable
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@

import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer;
import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreRemoveAppHomeSubClusterEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -359,6 +361,11 @@ protected synchronized void checkAppNumCompletedLimit() {
+ " from memory: ");
rmContext.getRMApps().remove(removeId);
this.applicationACLsManager.removeApplication(removeId);
// Remove application from federation state store
if (HAUtil.isFederationEnabled(conf)) {
rmContext.getFederationStateStoreService().getEventHandler().handle(
new FederationStateStoreRemoveAppHomeSubClusterEvent(removeId));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
Expand Down Expand Up @@ -203,4 +204,6 @@ void setMultiNodeSortingManager(
long getTokenSequenceNo();

void incrTokenSequenceNo();

FederationStateStoreService getFederationStateStoreService();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
Expand Down Expand Up @@ -678,4 +679,9 @@ public long getTokenSequenceNo() {
public void incrTokenSequenceNo() {
this.activeServiceContext.incrTokenSequenceNo();
}

@Override
public FederationStateStoreService getFederationStateStoreService() {
return activeServiceContext.getFederationStateStoreService();
}
}
Loading