Skip to content

YARN-9013. [BackPort] [GPG] fix order of steps cleaning Registry entries in ApplicationCleaner. #6147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5132,10 +5132,10 @@

<property>
<name>yarn.router.interceptor.user-thread-pool.keep-alive-time</name>
<value>0s</value>
<value>30s</value>
<description>
This configurable is used to set the keepAliveTime of the thread pool of the interceptor.
Default is 0s.
Default is 30s.
</description>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.commons.lang3.time.DurationFormatUtils;
Expand Down Expand Up @@ -95,6 +94,10 @@ public GPGContext getGPGContext() {
return this.gpgContext;
}

public FederationRegistryClient getRegistryClient() {
return this.registryClient;
}

/**
* Query router for applications.
*
Expand Down Expand Up @@ -152,18 +155,6 @@ public Set<ApplicationId> getRouterKnownApplications() throws YarnException {
+ " success Router queries after " + totalAttemptCount + " retries");
}

protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) {
List<String> allApps = this.registryClient.getAllApplications();
LOG.info("Got {} existing apps in registry.", allApps.size());
for (String app : allApps) {
ApplicationId appId = ApplicationId.fromString(app);
if (!knownApps.contains(appId)) {
LOG.info("removing finished application entry for {}", app);
this.registryClient.removeAppFromRegistry(appId, true);
}
}
}

@Override
public abstract void run();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
Expand All @@ -45,33 +46,49 @@ public void run() {
LOG.info("Application cleaner run at time {}", now);

FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade();
Set<ApplicationId> candidates = new HashSet<>();
try {
// Get the candidate list from StateStore before calling router
Set<ApplicationId> allStateStoreApps = new HashSet<>();
List<ApplicationHomeSubCluster> response =
facade.getApplicationsHomeSubCluster();
for (ApplicationHomeSubCluster app : response) {
candidates.add(app.getApplicationId());
allStateStoreApps.add(app.getApplicationId());
}
LOG.info("{} app entries in FederationStateStore", candidates.size());
LOG.info("{} app entries in FederationStateStore", allStateStoreApps.size());

// Get the candidate list from Registry before calling router
List<String> allRegistryApps = getRegistryClient().getAllApplications();
LOG.info("{} app entries in FederationRegistry", allStateStoreApps.size());

// Get the list of known apps from Router
Set<ApplicationId> routerApps = getRouterKnownApplications();
LOG.info("{} known applications from Router", routerApps.size());

candidates.removeAll(routerApps);
LOG.info("Deleting {} applications from statestore", candidates.size());
if (LOG.isDebugEnabled()) {
LOG.debug("Apps to delete: {}.", candidates.stream().map(Object::toString)
.collect(Collectors.joining(",")));
}
for (ApplicationId appId : candidates) {
// Clean up StateStore entries
Set<ApplicationId> toDelete =
Sets.difference(allStateStoreApps, routerApps);

LOG.info("Deleting {} applications from statestore", toDelete.size());
LOG.debug("Apps to delete: {}.",
toDelete.stream().map(Object::toString).collect(Collectors.joining(",")));

for (ApplicationId appId : toDelete) {
try {
LOG.debug("Deleting {} from statestore ", appId);
facade.deleteApplicationHomeSubCluster(appId);
} catch (Exception e) {
LOG.error("deleteApplicationHomeSubCluster failed at application {}.", appId, e);
}
}
// Clean up registry entries
cleanupAppRecordInRegistry(routerApps);

// Clean up Registry entries
for (String app : allRegistryApps) {
ApplicationId appId = ApplicationId.fromString(app);
if (!routerApps.contains(appId)) {
LOG.debug("removing finished application entry for {}", app);
getRegistryClient().removeAppFromRegistry(appId, true);
}
}
} catch (Throwable e) {
LOG.error("Application cleaner started at time {} fails. ", now, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
Expand All @@ -63,6 +64,8 @@ public class TestDefaultApplicationCleaner {
// The list of applications returned by mocked router
private Set<ApplicationId> routerAppIds;

private ApplicationId appIdToAddConcurrently;

@Before
public void setup() throws Exception {
conf = new YarnConfiguration();
Expand Down Expand Up @@ -111,6 +114,7 @@ public void setup() throws Exception {
new Token<AMRMTokenIdentifier>());
}
Assert.assertEquals(3, registryClient.getAllApplications().size());
appIdToAddConcurrently = null;
}

@After
Expand Down Expand Up @@ -159,7 +163,42 @@ public class TestableDefaultApplicationCleaner
extends DefaultApplicationCleaner {
@Override
public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
if (appIdToAddConcurrently != null) {
SubClusterId scId = SubClusterId.newInstance("MySubClusterId");
try {
ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(appIdToAddConcurrently, scId);
AddApplicationHomeSubClusterRequest request =
AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster);
stateStore.addApplicationHomeSubCluster(request);
} catch (YarnException e) {
throw new YarnRuntimeException(e);
}
registryClient.writeAMRMTokenForUAM(appIdToAddConcurrently, scId.toString(),
new Token<>());
}
return routerAppIds;
}
}

@Test
public void testConcurrentNewApp() throws YarnException {
appIdToAddConcurrently = ApplicationId.newInstance(1, 1);

appCleaner.run();

// The concurrently added app should be still there
GetApplicationsHomeSubClusterRequest appHomeSubClusterRequest =
GetApplicationsHomeSubClusterRequest.newInstance();
GetApplicationsHomeSubClusterResponse applicationsHomeSubCluster =
stateStore.getApplicationsHomeSubCluster(appHomeSubClusterRequest);
Assert.assertNotNull(applicationsHomeSubCluster);
List<ApplicationHomeSubCluster> appsHomeSubClusters =
applicationsHomeSubCluster.getAppsHomeSubClusters();
Assert.assertNotNull(appsHomeSubClusters);
Assert.assertEquals(1, appsHomeSubClusters.size());

// The concurrently added app should be still there
Assert.assertEquals(1, registryClient.getAllApplications().size());
}
}