Skip to content

Commit

Permalink
HADOOP-16828. Zookeeper Delegation Token Manager fetch sequence numbe…
Browse files Browse the repository at this point in the history
…r by batch. Contributed by Fengnan Li.
  • Loading branch information
xiaoyuyao committed Jun 2, 2020
1 parent ed83c86 commit 6288e15
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,16 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
+ "kerberos.keytab";
public static final String ZK_DTSM_ZK_KERBEROS_PRINCIPAL = ZK_CONF_PREFIX
+ "kerberos.principal";
public static final String ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE = ZK_CONF_PREFIX
+ "token.seqnum.batch.size";

public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000;
public static final int ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT = 10000;
public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm";
// by default it is still incrementing seq number by 1 each time
public static final int ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT = 1;

private static Logger LOG = LoggerFactory
.getLogger(ZKDelegationTokenSecretManager.class);
Expand Down Expand Up @@ -135,6 +139,9 @@ public static void setCurator(CuratorFramework curator) {
private PathChildrenCache tokenCache;
private ExecutorService listenerThreadPool;
private final long shutdownTimeout;
private final int seqNumBatchSize;
private int currentSeqNum;
private int currentMaxSeqNum;

public ZKDelegationTokenSecretManager(Configuration conf) {
super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
Expand All @@ -147,6 +154,8 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
shutdownTimeout = conf.getLong(ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
if (CURATOR_TL.get() != null) {
zkClient =
CURATOR_TL.get().usingNamespace(
Expand Down Expand Up @@ -322,6 +331,12 @@ public void startThreads() throws IOException {
if (delTokSeqCounter != null) {
delTokSeqCounter.start();
}
// the first batch range should be allocated during this starting window
// by calling the incrSharedCount
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
LOG.info("Fetched initial range of seq num, from {} to {} ",
currentSeqNum+1, currentMaxSeqNum);
} catch (Exception e) {
throw new IOException("Could not start Sequence Counter", e);
}
Expand Down Expand Up @@ -562,28 +577,41 @@ protected int getDelegationTokenSeqNum() {
return delTokSeqCounter.getCount();
}

private void incrSharedCount(SharedCount sharedCount) throws Exception {
private int incrSharedCount(SharedCount sharedCount, int batchSize)
throws Exception {
while (true) {
// Loop until we successfully increment the counter
VersionedValue<Integer> versionedValue = sharedCount.getVersionedValue();
if (sharedCount.trySetCount(versionedValue, versionedValue.getValue() + 1)) {
break;
if (sharedCount.trySetCount(
versionedValue, versionedValue.getValue() + batchSize)) {
return versionedValue.getValue();
}
}
}

@Override
protected int incrementDelegationTokenSeqNum() {
try {
incrSharedCount(delTokSeqCounter);
} catch (InterruptedException e) {
// The ExpirationThread is just finishing.. so dont do anything..
LOG.debug("Thread interrupted while performing token counter increment", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e);
// The secret manager will keep a local range of seq num which won't be
// seen by peers, so only when the range is exhausted it will ask zk for
// another range again
if (currentSeqNum >= currentMaxSeqNum) {
try {
// after a successful batch request, we can get the range starting point
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
LOG.info("Fetched new range of seq num, from {} to {} ",
currentSeqNum+1, currentMaxSeqNum);
} catch (InterruptedException e) {
// The ExpirationThread is just finishing.. so dont do anything..
LOG.debug(
"Thread interrupted while performing token counter increment", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e);
}
}
return delTokSeqCounter.getCount();

return ++currentSeqNum;
}

@Override
Expand All @@ -603,7 +631,7 @@ protected int getCurrentKeyId() {
@Override
protected int incrementCurrentKeyId() {
try {
incrSharedCount(keyIdSeqCounter);
incrSharedCount(keyIdSeqCounter, 1);
} catch (InterruptedException e) {
// The ExpirationThread is just finishing.. so dont do anything..
LOG.debug("Thread interrupted while performing keyId increment", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,58 @@ public void testNodeUpAferAWhile() throws Exception {
}
}

@SuppressWarnings("unchecked")
@Test
public void testMultiNodeCompeteForSeqNum() throws Exception {
DelegationTokenManager tm1, tm2 = null;
String connectString = zkServer.getConnectString();
Configuration conf = getSecretConf(connectString);
conf.setInt(
ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE, 1000);
tm1 = new DelegationTokenManager(conf, new Text("bla"));
tm1.init();

Token<DelegationTokenIdentifier> token1 =
(Token<DelegationTokenIdentifier>) tm1.createToken(
UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token1);
AbstractDelegationTokenIdentifier id1 =
tm1.getDelegationTokenSecretManager().decodeTokenIdentifier(token1);
Assert.assertEquals(
"Token seq should be the same", 1, id1.getSequenceNumber());
Token<DelegationTokenIdentifier> token2 =
(Token<DelegationTokenIdentifier>) tm1.createToken(
UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token2);
AbstractDelegationTokenIdentifier id2 =
tm1.getDelegationTokenSecretManager().decodeTokenIdentifier(token2);
Assert.assertEquals(
"Token seq should be the same", 2, id2.getSequenceNumber());

tm2 = new DelegationTokenManager(conf, new Text("bla"));
tm2.init();

Token<DelegationTokenIdentifier> token3 =
(Token<DelegationTokenIdentifier>) tm2.createToken(
UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token3);
AbstractDelegationTokenIdentifier id3 =
tm2.getDelegationTokenSecretManager().decodeTokenIdentifier(token3);
Assert.assertEquals(
"Token seq should be the same", 1001, id3.getSequenceNumber());
Token<DelegationTokenIdentifier> token4 =
(Token<DelegationTokenIdentifier>) tm2.createToken(
UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token4);
AbstractDelegationTokenIdentifier id4 =
tm2.getDelegationTokenSecretManager().decodeTokenIdentifier(token4);
Assert.assertEquals(
"Token seq should be the same", 1002, id4.getSequenceNumber());

verifyDestroy(tm1, conf);
verifyDestroy(tm2, conf);
}

@SuppressWarnings("unchecked")
@Test
public void testRenewTokenSingleManager() throws Exception {
Expand Down

0 comments on commit 6288e15

Please sign in to comment.