Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-10885. Make FederationStateStoreFacade#getApplicationHomeSubCluster use JCache. #4701

Merged
merged 11 commits into from
Aug 15, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public final class FederationStateStoreFacade {
private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
private static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
"getPoliciesConfigurations";
private static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID =
"getApplicationHomeSubCluster";

private static final FederationStateStoreFacade FACADE =
new FederationStateStoreFacade();
Expand Down Expand Up @@ -376,10 +378,23 @@ public void updateApplicationHomeSubCluster(
*/
public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
throws YarnException {
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(
try {
if (isCachingEnabled()) {
Object value =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fits in one line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your help reviewing the code, I will fix it.

cache.get(buildGetApplicationHomeSubClusterRequest(appId));
if (value instanceof SubClusterId) {
return (SubClusterId) value;
} else {
throw new YarnException("Cannot be converted to SubClusterId.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add the type of the original.

Copy link
Contributor Author

@slfan1989 slfan1989 Aug 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the code does need cast, I simplified the code.

The main reasons are as follows:

The key and value used by the Cache defined here are both Object types.

private Cache<Object, Object> cache;

}
} else {
GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(appId));
return response.getApplicationHomeSubCluster().getHomeSubCluster();
return response.getApplicationHomeSubCluster().getHomeSubCluster();
}
} catch (Throwable ex) {
throw new YarnException(ex);
}
}

/**
Expand Down Expand Up @@ -513,6 +528,26 @@ public Map<String, SubClusterPolicyConfiguration> invoke(
return cacheRequest;
}

private Object buildGetApplicationHomeSubClusterRequest(ApplicationId applicationId) {
final String cacheKey = buildCacheKey(getClass().getSimpleName(),
GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, applicationId.toString());
CacheRequest<String, SubClusterId> cacheRequest =
new CacheRequest<>(
cacheKey,
input -> {
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(applicationId));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract the request and probably do indentation like:

    CacheRequest<String, SubClusterId> cacheRequest = new CacheRequest<>(
        cacheKey,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.


ApplicationHomeSubCluster applicationHomeSubCluster =
response.getApplicationHomeSubCluster();
SubClusterId subClusterId = applicationHomeSubCluster.getHomeSubCluster();

return subClusterId;
});
return cacheRequest;
}

protected String buildCacheKey(String typeName, String methodName,
String argName) {
StringBuilder buffer = new StringBuilder();
Expand Down Expand Up @@ -560,7 +595,7 @@ private static class CacheRequest<K, V> {
private K key;
private Func<K, V> func;

public CacheRequest(K key, Func<K, V> func) {
CacheRequest(K key, Func<K, V> func) {
this.key = key;
this.func = func;
}
Expand Down Expand Up @@ -609,4 +644,14 @@ public boolean equals(Object obj) {
protected interface Func<T, TResult> {
TResult invoke(T input) throws Exception;
}

@VisibleForTesting
public Cache<Object, Object> getCache() {
return cache;
}

@VisibleForTesting
protected Object getAppHomeSubClusterCacheRequest(ApplicationId applicationId) {
return buildGetApplicationHomeSubClusterRequest(applicationId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import javax.cache.Cache;

/**
* Unit tests for FederationStateStoreFacade.
*/
Expand All @@ -64,12 +66,14 @@ public static Collection<Boolean[]> getParameters() {
private FederationStateStoreTestUtil stateStoreTestUtil;
private FederationStateStoreFacade facade =
FederationStateStoreFacade.getInstance();
private Boolean isCachingEnabled;

public TestFederationStateStoreFacade(Boolean isCachingEnabled) {
conf = new Configuration();
if (!(isCachingEnabled.booleanValue())) {
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
}
this.isCachingEnabled = isCachingEnabled;
}

@Before
Expand Down Expand Up @@ -206,4 +210,27 @@ public void testAddApplicationHomeSubCluster() throws YarnException {
Assert.assertEquals(subClusterId1, result);
}

@Test
public void testGetApplicationHomeSubClusterCache() throws YarnException {
ApplicationId appId = ApplicationId.newInstance(clusterTs, numApps + 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("Home1");

ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(appId, subClusterId1);
SubClusterId subClusterIdAdd =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.

facade.addApplicationHomeSubCluster(appHomeSubCluster);

SubClusterId subClusterIdByFacade = facade.getApplicationHomeSubCluster(appId);
Assert.assertEquals(subClusterIdByFacade, subClusterIdAdd);
Assert.assertEquals(subClusterId1, subClusterIdAdd);

if (isCachingEnabled.booleanValue()) {
Cache<Object, Object> cache = facade.getCache();
Object cacheKey = facade.getAppHomeSubClusterCacheRequest(appId);
Object subClusterIdByCache = cache.get(cacheKey);
Assert.assertEquals(subClusterIdByFacade, subClusterIdByCache);
Assert.assertEquals(subClusterId1, subClusterIdByCache);
}
}

}