-
Notifications
You must be signed in to change notification settings - Fork 9.1k
YARN-11350. [Federation] Router Support DelegationToken With ZK. #5131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
@goiri Can you help review this pr? Thank you very much! |
.../java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
Show resolved
Hide resolved
ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate()); | ||
} catch (Exception ex) { | ||
if (!quiet) { | ||
LOG.error("No node in path [" + nodePath + "]"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use {}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
* @return | ||
* @throws IOException | ||
*/ | ||
private RouterMasterKey getRouterMasterKeyFromZK(String nodePath, boolean quiet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see much value in this quiet option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, I will remove the quiet option.
@@ -879,7 +879,7 @@ public void testStoreNewMasterKey() throws Exception { | |||
} | |||
|
|||
@Test | |||
public void testGetMasterKeyByDelegationKey() throws YarnException, IOException { | |||
public void testGetMasterKeyByDelegationKey() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is correct but do we need to add all this churn?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception thrown by the method signature, I will recover. In the process of writing JunitTest code, I used curatorFramework.getData().forPath
method to get data from ZK. This method throws an Exception
, so I modified the exception throwing code of the method signature.
/**
* Commit the currently building operation using the given path
*
* @param path the path
* @return operation result if any
* @throws Exception errors
*/
public T forPath(String path) throws Exception;
@Test(expected = NotImplementedException.class) | ||
public void testRemoveStoredMasterKey() throws YarnException, IOException { | ||
super.testRemoveStoredMasterKey(); | ||
public void testUpdateStoredToken() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep the Test annotation in all these methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion, I will modify the code.
…-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java Co-authored-by: Inigo Goiri <elgoiri@gmail.com>
Thanks @slfan1989 , Great work here! |
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
@goiri Can you help review this PR again? Thank you very much! |
🎊 +1 overall
This message was automatically generated. |
@Hexiaoqiao Thank you very much for your suggestion, I agree with your idea. After completing this pr, I will continue to carefully read the codes of the HDFS RBF / KMS modules about delegation token, and then extract this part of the code to the common module. If other modules need it, we can be used directly. |
🎊 +1 overall
This message was automatically generated. |
@goiri Can you help to review this PR again? Thank you very much! |
@Override | ||
public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) | ||
public synchronized RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we protecting with the synchronized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for your help in reviewing the code. In ZookeeperFederationStateStore, we have 7 methods that use the synchronized keyword
- Group1 methods are mainly used to create or remove MasterKey
- storeNewMasterKey
- removeStoredMasterKey
- Group2 methods are mainly used to create or update and remove Token
- storeNewToken
- updateStoredToken
- removeStoredToken
- Group3 methods are used to increment DelegationTokenSeqNum and MasterKeyId
- incrementDelegationTokenSeqNum
- incrementCurrentKeyId
This part is mainly to protect the ZNode of ZK. We hope that the same Router has only one thread to write or update or delete the same ZNode at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a finer grain lock. There are many things synchronized that can be done in parallel. Maybe a lock per group too. Could we even do a read write lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion, I will refactor this part of the code using lock
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a finer grain lock. There are many things synchronized that can be done in parallel. Maybe a lock per group too. Could we even do a read write lock?
Thank you very much for your suggestion, your understanding is accurate, if we use synchronized keyword at method level, there is too much involved.
When completing the PR (#5169), JIRA: YARN-11349. [Federation] Router Support Delegation Token With SQL.
I carefully read the AbstractDelegationTokenSecretManager
code, I think we should be able to remove the synchronized
keyword in ZookeeperFederationStateStore
.
I am worried that concurrent access may cause different threads to generate the same primary key data, so that errors will occur when storing, so in the methods of adding, deleting, and updating MasterKey and Token I used the synchronized
method.
But I found out that this does not happen for generating the same primary key.
Group1 methods are mainly used to create or remove MasterKey
The key information in RouterMasterKey
is DelegationKey
, and the primary key of DelegationKey
is keyId
.
We found that DelegationKey
will be constructed in AbstractDelegationTokenSecretManager#updateCurrentKey
.
- AbstractDelegationTokenSecretManager#updateCurrentKey
This method is protected by synchronized when generating the primary key keyId and storing the masterKey, so the uniqueness of the primary key can be guaranteed
private void updateCurrentKey() throws IOException {
LOG.info("Updating the current master key for generating delegation tokens");
int newCurrentId;
// 1. newCurrentId is keyId, which has been protected
synchronized (this) {
newCurrentId = incrementCurrentKeyId();
}
DelegationKey newKey = new DelegationKey(newCurrentId, System
.currentTimeMillis()
+ keyUpdateInterval + tokenMaxLifetime, generateSecret());
logUpdateMasterKey(newKey);
// 2. storeDelegationKey calls storeNewMasterKey, this part has been protected
synchronized (this) {
currentKey = newKey;
storeDelegationKey(currentKey);
}
}
protected void storeDelegationKey(DelegationKey key) throws IOException {
allKeys.put(key.getKeyId(), key);
storeNewMasterKey(key);
}
We can find that the method of removeMasterKey
is also protected.
- AbstractDelegationTokenSecretManager#removeExpiredKeys
private synchronized void removeExpiredKeys() {
long now = Time.now();
for (Iterator<Map.Entry<Integer, DelegationKey>> it = allKeys.entrySet()
.iterator(); it.hasNext();) {
Map.Entry<Integer, DelegationKey> e = it.next();
if (e.getValue().getExpiryDate() < now) {
it.remove();
// ensure the tokens generated by this current key can be recovered
// with this current key after this current key is rolled
if(!e.getValue().equals(currentKey))
removeStoredMasterKey(e.getValue());
}
}
}
Group2 methods are mainly used to create or update and remove Token
For DelegationToken
, we can also find that the add
, update
, and remove
methods are all protected.
- AbstractDelegationTokenSecretManager#storeToken
protected synchronized byte[] createPassword(TokenIdent identifier) {
.....
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
try {
// storeToken
METRICS.trackStoreToken(() -> storeToken(identifier, tokenInfo));
} catch (IOException ioe) {
LOG.error("Could not store token " + formatTokenId(identifier) + "!!",
ioe);
}
return password;
}
- AbstractDelegationTokenSecretManager#renewToken
public synchronized long renewToken(Token<TokenIdent> token,
String renewer) throws InvalidToken, IOException {
......
long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval);
String trackingId = getTrackingIdIfEnabled(id);
DelegationTokenInformation info = new DelegationTokenInformation(renewTime,
password, trackingId);
if (getTokenInfo(id) == null) {
throw new InvalidToken("Renewal request for unknown token "
+ formatTokenId(id));
}
// updateToken
METRICS.trackUpdateToken(() -> updateToken(id, info));
return renewTime;
}
- AbstractDelegationTokenSecretManager#cancelToken
public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
String canceller) throws IOException {
......
DelegationTokenInformation info = currentTokens.remove(id);
if (info == null) {
throw new InvalidToken("Token not found " + formatTokenId(id));
}
// cancelToken
METRICS.trackRemoveToken(() -> {
removeTokenForOwnerStats(id);
removeStoredToken(id);
});
return id;
}
Group3 methods are used to increment
DelegationTokenSeqNum
andMasterKeyId
These two methods are also protected in the abstract class.
- AbstractDelegationTokenSecretManager#incrementDelegationTokenSeqNum
protected synchronized int incrementDelegationTokenSeqNum() {
return ++delegationTokenSequenceNumber;
}
- AbstractDelegationTokenSecretManager#incrementCurrentKeyId
protected synchronized int incrementCurrentKeyId() {
return ++currentId;
}
@@ -905,7 +905,7 @@ public void testGetMasterKeyByDelegationKey() throws YarnException, IOException | |||
} | |||
|
|||
@Test | |||
public void testRemoveStoredMasterKey() throws YarnException, IOException { | |||
public void testRemoveStoredMasterKey() throws IOException, YarnException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for reviewing the code, I will fix it.
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate()); | ||
FederationStateStore stateStore = this.getStateStore(); | ||
|
||
assertTrue(stateStore instanceof ZookeeperFederationStateStore); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the super method and have a function to check these things we do in every method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for helping to review the code, I will fix it.
RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); | ||
RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); | ||
RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request); | ||
Assert.assertNotNull(routerRMTokenResponse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you statically imported assertTrue and others, import this too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
@goiri Can you help review pr(#5182)? Thank you very much! I fixed Java Doc issue for In the process of compiling YARN-11350, I encountered some java doc compilation errors. We can see the compilation report as follows: |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest; | ||
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest; | ||
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest; | ||
import org.apache.hadoop.yarn.server.federation.store.records.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for helping to review the code, I will fix it.
...st/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
Show resolved
Hide resolved
public void testStoreNewMasterKey() throws Exception { | ||
super.testStoreNewMasterKey(); | ||
} | ||
|
||
@Test(expected = NotImplementedException.class) | ||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just inherit and no need to have them here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, I will remove this part of the code.
nodeCreatePath); | ||
|
||
// Write master key data to zk. | ||
try(ByteArrayOutputStream os = new ByteArrayOutputStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Space after try.
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
@goiri Can you help to merge this pr into the trunk branch? Thank you very much! We have fixed all java doc issues for |
💔 -1 overall
This message was automatically generated. |
@goiri Thank you very much for helping to review the code! |
JIRA: YARN-11350. [Federation] Router Support DelegationToken With ZK.
YARN Router needs to support DelegationToken. In the process of implementing this feature, refer to RMDelegationTokenSecretManager, HDFS RBF#ZKDelegationTokenSecretManager.
There are a few things to keep in mind to implement this feature:
1.How to store and query
masterkey
?2.How to store and query
DelegationToken
?3.How to store and query
SequenceNum
?For
question 1
andquestion 2
, we can directly refer to RMDelegationTokenSecretManager to serialize the data and store it in ZK.For
question 3
, we need to consider multiple routers sharing sequenceNum, and we need to ensure that the self-increment of sequenceNum can be shared among multiple routers. At this point we will use curator's SharedCount to implement this function.For the introduction of
SharedCount
, we can refer to the following articleshttps://curator.apache.org/curator-recipes/shared-counter.html.
Storage display of
delegationToken
in zk.ls /federationstore

ls /federationstore/router_rm_dt_secret_manager_root

The data of

masterKey
will be stored inrouter_rm_dt_master_keys_root
The ZNode generation rule of
masterKey
isdelegation_key_ + keyId
ls /federationstore/router_rm_dt_secret_manager_root/router_rm_dt_master_keys_root
The data of

delegationToken
will be stored inrouter_rm_delegation_tokens_root
ls /federationstore/router_rm_dt_secret_manager_root/router_rm_delegation_tokens_root
The ZNode generation rule of
delegationToken
isrm_delegation_token_ + keyId
The data of

sequenceNum
will be stored inrouter_rm_dt_sequential_number