Skip to content

Commit

Permalink
YARN-10885. Make FederationStateStoreFacade#getApplicationHomeSubClus…
Browse files Browse the repository at this point in the history
…ter use JCache. (#4701)
  • Loading branch information
slfan1989 authored Aug 15, 2022
1 parent b1d4af2 commit eff3b8c
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public final class FederationStateStoreFacade {
private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
private static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
"getPoliciesConfigurations";
private static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID =
"getApplicationHomeSubCluster";

private static final FederationStateStoreFacade FACADE =
new FederationStateStoreFacade();
Expand Down Expand Up @@ -382,10 +384,19 @@ public void updateApplicationHomeSubCluster(
*/
public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
throws YarnException {
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(
try {
if (isCachingEnabled()) {
SubClusterId value = SubClusterId.class.cast(
cache.get(buildGetApplicationHomeSubClusterRequest(appId)));
return value;
} else {
GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(appId));
return response.getApplicationHomeSubCluster().getHomeSubCluster();
return response.getApplicationHomeSubCluster().getHomeSubCluster();
}
} catch (Throwable ex) {
throw new YarnException(ex);
}
}

/**
Expand Down Expand Up @@ -548,6 +559,26 @@ public Map<String, SubClusterPolicyConfiguration> invoke(
return cacheRequest;
}

private Object buildGetApplicationHomeSubClusterRequest(ApplicationId applicationId) {
final String cacheKey = buildCacheKey(getClass().getSimpleName(),
GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, applicationId.toString());
CacheRequest<String, SubClusterId> cacheRequest = new CacheRequest<>(
cacheKey,
input -> {

GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(applicationId);
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(request);

ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster();
SubClusterId subClusterId = appHomeSubCluster.getHomeSubCluster();

return subClusterId;
});
return cacheRequest;
}

protected String buildCacheKey(String typeName, String methodName,
String argName) {
StringBuilder buffer = new StringBuilder();
Expand Down Expand Up @@ -645,6 +676,15 @@ protected interface Func<T, TResult> {
TResult invoke(T input) throws Exception;
}

@VisibleForTesting
public Cache<Object, Object> getCache() {
return cache;
}

@VisibleForTesting
protected Object getAppHomeSubClusterCacheRequest(ApplicationId applicationId) {
return buildGetApplicationHomeSubClusterRequest(applicationId);
}

@VisibleForTesting
public FederationStateStore getStateStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import javax.cache.Cache;

/**
* Unit tests for FederationStateStoreFacade.
*/
Expand All @@ -64,12 +66,14 @@ public static Collection<Boolean[]> getParameters() {
private FederationStateStoreTestUtil stateStoreTestUtil;
private FederationStateStoreFacade facade =
FederationStateStoreFacade.getInstance();
private Boolean isCachingEnabled;

public TestFederationStateStoreFacade(Boolean isCachingEnabled) {
conf = new Configuration();
if (!(isCachingEnabled.booleanValue())) {
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
}
this.isCachingEnabled = isCachingEnabled;
}

@Before
Expand Down Expand Up @@ -206,4 +210,26 @@ public void testAddApplicationHomeSubCluster() throws YarnException {
Assert.assertEquals(subClusterId1, result);
}

@Test
public void testGetApplicationHomeSubClusterCache() throws YarnException {
ApplicationId appId = ApplicationId.newInstance(clusterTs, numApps + 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("Home1");

ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(appId, subClusterId1);
SubClusterId subClusterIdAdd = facade.addApplicationHomeSubCluster(appHomeSubCluster);

SubClusterId subClusterIdByFacade = facade.getApplicationHomeSubCluster(appId);
Assert.assertEquals(subClusterIdByFacade, subClusterIdAdd);
Assert.assertEquals(subClusterId1, subClusterIdAdd);

if (isCachingEnabled.booleanValue()) {
Cache<Object, Object> cache = facade.getCache();
Object cacheKey = facade.getAppHomeSubClusterCacheRequest(appId);
Object subClusterIdByCache = cache.get(cacheKey);
Assert.assertEquals(subClusterIdByFacade, subClusterIdByCache);
Assert.assertEquals(subClusterId1, subClusterIdByCache);
}
}

}

0 comments on commit eff3b8c

Please sign in to comment.