-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
YARN-10885. Make FederationStateStoreFacade#getApplicationHomeSubCluster use JCache. #4701
Changes from 8 commits
d45e599
6ad9607
1454a06
0c9d648
12d77ad
0eefb19
f6096e2
ce5ca51
fb42275
7a9c9a5
0ed4dd4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -86,6 +86,8 @@ public final class FederationStateStoreFacade { | |
private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters"; | ||
private static final String GET_POLICIES_CONFIGURATIONS_CACHEID = | ||
"getPoliciesConfigurations"; | ||
private static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID = | ||
"getApplicationHomeSubCluster"; | ||
|
||
private static final FederationStateStoreFacade FACADE = | ||
new FederationStateStoreFacade(); | ||
|
@@ -376,10 +378,23 @@ public void updateApplicationHomeSubCluster( | |
*/ | ||
public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) | ||
throws YarnException { | ||
GetApplicationHomeSubClusterResponse response = | ||
stateStore.getApplicationHomeSubCluster( | ||
try { | ||
if (isCachingEnabled()) { | ||
Object value = | ||
cache.get(buildGetApplicationHomeSubClusterRequest(appId)); | ||
if (value instanceof SubClusterId) { | ||
return (SubClusterId) value; | ||
} else { | ||
throw new YarnException("Cannot be converted to SubClusterId."); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add the type of the original. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This part of the code does need cast, I simplified the code. The main reasons are as follows: The key and value used by the Cache defined here are both Object types.
|
||
} | ||
} else { | ||
GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster( | ||
GetApplicationHomeSubClusterRequest.newInstance(appId)); | ||
return response.getApplicationHomeSubCluster().getHomeSubCluster(); | ||
return response.getApplicationHomeSubCluster().getHomeSubCluster(); | ||
} | ||
} catch (Throwable ex) { | ||
throw new YarnException(ex); | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -513,6 +528,26 @@ public Map<String, SubClusterPolicyConfiguration> invoke( | |
return cacheRequest; | ||
} | ||
|
||
private Object buildGetApplicationHomeSubClusterRequest(ApplicationId applicationId) { | ||
final String cacheKey = buildCacheKey(getClass().getSimpleName(), | ||
GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, applicationId.toString()); | ||
CacheRequest<String, SubClusterId> cacheRequest = | ||
new CacheRequest<>( | ||
cacheKey, | ||
input -> { | ||
GetApplicationHomeSubClusterResponse response = | ||
stateStore.getApplicationHomeSubCluster( | ||
GetApplicationHomeSubClusterRequest.newInstance(applicationId)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extract the request and probably do indentation like:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will fix it. |
||
|
||
ApplicationHomeSubCluster applicationHomeSubCluster = | ||
response.getApplicationHomeSubCluster(); | ||
SubClusterId subClusterId = applicationHomeSubCluster.getHomeSubCluster(); | ||
|
||
return subClusterId; | ||
}); | ||
return cacheRequest; | ||
} | ||
|
||
protected String buildCacheKey(String typeName, String methodName, | ||
String argName) { | ||
StringBuilder buffer = new StringBuilder(); | ||
|
@@ -560,7 +595,7 @@ private static class CacheRequest<K, V> { | |
private K key; | ||
private Func<K, V> func; | ||
|
||
public CacheRequest(K key, Func<K, V> func) { | ||
CacheRequest(K key, Func<K, V> func) { | ||
this.key = key; | ||
this.func = func; | ||
} | ||
|
@@ -609,4 +644,14 @@ public boolean equals(Object obj) { | |
protected interface Func<T, TResult> { | ||
TResult invoke(T input) throws Exception; | ||
} | ||
|
||
@VisibleForTesting | ||
public Cache<Object, Object> getCache() { | ||
return cache; | ||
} | ||
|
||
@VisibleForTesting | ||
protected Object getAppHomeSubClusterCacheRequest(ApplicationId applicationId) { | ||
return buildGetApplicationHomeSubClusterRequest(applicationId); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,8 @@ | |
import org.junit.runners.Parameterized; | ||
import org.junit.runners.Parameterized.Parameters; | ||
|
||
import javax.cache.Cache; | ||
|
||
/** | ||
* Unit tests for FederationStateStoreFacade. | ||
*/ | ||
|
@@ -64,12 +66,14 @@ public static Collection<Boolean[]> getParameters() { | |
private FederationStateStoreTestUtil stateStoreTestUtil; | ||
private FederationStateStoreFacade facade = | ||
FederationStateStoreFacade.getInstance(); | ||
private Boolean isCachingEnabled; | ||
|
||
public TestFederationStateStoreFacade(Boolean isCachingEnabled) { | ||
conf = new Configuration(); | ||
if (!(isCachingEnabled.booleanValue())) { | ||
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); | ||
} | ||
this.isCachingEnabled = isCachingEnabled; | ||
} | ||
|
||
@Before | ||
|
@@ -206,4 +210,27 @@ public void testAddApplicationHomeSubCluster() throws YarnException { | |
Assert.assertEquals(subClusterId1, result); | ||
} | ||
|
||
@Test | ||
public void testGetApplicationHomeSubClusterCache() throws YarnException { | ||
ApplicationId appId = ApplicationId.newInstance(clusterTs, numApps + 1); | ||
SubClusterId subClusterId1 = SubClusterId.newInstance("Home1"); | ||
|
||
ApplicationHomeSubCluster appHomeSubCluster = | ||
ApplicationHomeSubCluster.newInstance(appId, subClusterId1); | ||
SubClusterId subClusterIdAdd = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One line? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will fix it. |
||
facade.addApplicationHomeSubCluster(appHomeSubCluster); | ||
|
||
SubClusterId subClusterIdByFacade = facade.getApplicationHomeSubCluster(appId); | ||
Assert.assertEquals(subClusterIdByFacade, subClusterIdAdd); | ||
Assert.assertEquals(subClusterId1, subClusterIdAdd); | ||
|
||
if (isCachingEnabled.booleanValue()) { | ||
Cache<Object, Object> cache = facade.getCache(); | ||
Object cacheKey = facade.getAppHomeSubClusterCacheRequest(appId); | ||
Object subClusterIdByCache = cache.get(cacheKey); | ||
Assert.assertEquals(subClusterIdByFacade, subClusterIdByCache); | ||
Assert.assertEquals(subClusterId1, subClusterIdByCache); | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fits in one line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your help reviewing the code, I will fix it.