Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-10885. Make FederationStateStoreFacade#getApplicationHomeSubCluster use JCache. #4701

Merged
merged 11 commits into from
Aug 15, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public final class FederationStateStoreFacade {
private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
private static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
"getPoliciesConfigurations";
private static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID =
"getApplicationHomeSubCluster";

private static final FederationStateStoreFacade FACADE =
new FederationStateStoreFacade();
Expand Down Expand Up @@ -376,10 +378,19 @@ public void updateApplicationHomeSubCluster(
*/
public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
throws YarnException {
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(
try {
if (isCachingEnabled()) {
SubClusterId value = SubClusterId.class.cast(
cache.get(buildGetApplicationHomeSubClusterRequest(appId)));
return value;
} else {
GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(appId));
return response.getApplicationHomeSubCluster().getHomeSubCluster();
return response.getApplicationHomeSubCluster().getHomeSubCluster();
}
} catch (Throwable ex) {
throw new YarnException(ex);
}
}

/**
Expand Down Expand Up @@ -513,6 +524,26 @@ public Map<String, SubClusterPolicyConfiguration> invoke(
return cacheRequest;
}

private Object buildGetApplicationHomeSubClusterRequest(ApplicationId applicationId) {
final String cacheKey = buildCacheKey(getClass().getSimpleName(),
GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, applicationId.toString());
CacheRequest<String, SubClusterId> cacheRequest = new CacheRequest<>(
cacheKey,
input -> {

GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(applicationId);
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(request);

ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster();
SubClusterId subClusterId = appHomeSubCluster.getHomeSubCluster();

return subClusterId;
});
return cacheRequest;
}

protected String buildCacheKey(String typeName, String methodName,
String argName) {
StringBuilder buffer = new StringBuilder();
Expand Down Expand Up @@ -560,7 +591,7 @@ private static class CacheRequest<K, V> {
private K key;
private Func<K, V> func;

public CacheRequest(K key, Func<K, V> func) {
CacheRequest(K key, Func<K, V> func) {
this.key = key;
this.func = func;
}
Expand Down Expand Up @@ -610,6 +641,15 @@ protected interface Func<T, TResult> {
TResult invoke(T input) throws Exception;
}

@VisibleForTesting
public Cache<Object, Object> getCache() {
return cache;
}

@VisibleForTesting
protected Object getAppHomeSubClusterCacheRequest(ApplicationId applicationId) {
return buildGetApplicationHomeSubClusterRequest(applicationId);
}

@VisibleForTesting
public FederationStateStore getStateStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import javax.cache.Cache;

/**
* Unit tests for FederationStateStoreFacade.
*/
Expand All @@ -64,12 +66,14 @@ public static Collection<Boolean[]> getParameters() {
private FederationStateStoreTestUtil stateStoreTestUtil;
private FederationStateStoreFacade facade =
FederationStateStoreFacade.getInstance();
private Boolean isCachingEnabled;

public TestFederationStateStoreFacade(Boolean isCachingEnabled) {
conf = new Configuration();
if (!(isCachingEnabled.booleanValue())) {
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
}
this.isCachingEnabled = isCachingEnabled;
}

@Before
Expand Down Expand Up @@ -206,4 +210,26 @@ public void testAddApplicationHomeSubCluster() throws YarnException {
Assert.assertEquals(subClusterId1, result);
}

@Test
public void testGetApplicationHomeSubClusterCache() throws YarnException {
ApplicationId appId = ApplicationId.newInstance(clusterTs, numApps + 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("Home1");

ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(appId, subClusterId1);
SubClusterId subClusterIdAdd = facade.addApplicationHomeSubCluster(appHomeSubCluster);

SubClusterId subClusterIdByFacade = facade.getApplicationHomeSubCluster(appId);
Assert.assertEquals(subClusterIdByFacade, subClusterIdAdd);
Assert.assertEquals(subClusterId1, subClusterIdAdd);

if (isCachingEnabled.booleanValue()) {
Cache<Object, Object> cache = facade.getCache();
Object cacheKey = facade.getAppHomeSubClusterCacheRequest(appId);
Object subClusterIdByCache = cache.get(cacheKey);
Assert.assertEquals(subClusterIdByFacade, subClusterIdByCache);
Assert.assertEquals(subClusterId1, subClusterIdByCache);
}
}

}