5353import org .apache .hadoop .security .token .Token ;
5454import org .apache .hadoop .security .token .delegation .web .DelegationTokenManager ;
5555import org .apache .hadoop .util .curator .ZKCuratorManager ;
56+ import static org .apache .hadoop .util .Time .now ;
5657import org .apache .zookeeper .CreateMode ;
5758import org .apache .zookeeper .KeeperException ;
5859import org .apache .zookeeper .KeeperException .NoNodeException ;
7778public abstract class ZKDelegationTokenSecretManager <TokenIdent extends AbstractDelegationTokenIdentifier >
7879 extends AbstractDelegationTokenSecretManager <TokenIdent > {
7980
80- private static final String ZK_CONF_PREFIX = "zk-dt-secret-manager." ;
81+ public static final String ZK_CONF_PREFIX = "zk-dt-secret-manager." ;
8182 public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX
8283 + "zkNumRetries" ;
8384 public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX
@@ -100,6 +101,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
100101 + "kerberos.server.principal" ;
101102 public static final String ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE = ZK_CONF_PREFIX
102103 + "token.seqnum.batch.size" ;
104+ public static final String ZK_DTSM_TOKEN_WATCHER_ENABLED = ZK_CONF_PREFIX
105+ + "token.watcher.enabled" ;
106+ public static final boolean ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT = true ;
103107
104108 public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3 ;
105109 public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000 ;
@@ -118,7 +122,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
118122 private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot" ;
119123 private static final String ZK_DTSM_SEQNUM_ROOT = "/ZKDTSMSeqNumRoot" ;
120124 private static final String ZK_DTSM_KEYID_ROOT = "/ZKDTSMKeyIdRoot" ;
121- private static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot" ;
125+ protected static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot" ;
122126 private static final String ZK_DTSM_MASTER_KEY_ROOT = "/ZKDTSMMasterKeyRoot" ;
123127
124128 private static final String DELEGATION_KEY_PREFIX = "DK_" ;
@@ -132,7 +136,7 @@ public static void setCurator(CuratorFramework curator) {
132136 }
133137
134138 private final boolean isExternalClient ;
135- private final CuratorFramework zkClient ;
139+ protected final CuratorFramework zkClient ;
136140 private SharedCount delTokSeqCounter ;
137141 private SharedCount keyIdSeqCounter ;
138142 private CuratorCacheBridge keyCache ;
@@ -141,6 +145,8 @@ public static void setCurator(CuratorFramework curator) {
141145 private int currentSeqNum ;
142146 private int currentMaxSeqNum ;
143147
148+ private final boolean isTokenWatcherEnabled ;
149+
144150 public ZKDelegationTokenSecretManager (Configuration conf ) {
145151 super (conf .getLong (DelegationTokenManager .UPDATE_INTERVAL ,
146152 DelegationTokenManager .UPDATE_INTERVAL_DEFAULT ) * 1000 ,
@@ -152,6 +158,8 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
152158 DelegationTokenManager .REMOVAL_SCAN_INTERVAL_DEFAULT ) * 1000 );
153159 seqNumBatchSize = conf .getInt (ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE ,
154160 ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT );
161+ isTokenWatcherEnabled = conf .getBoolean (ZK_DTSM_TOKEN_WATCHER_ENABLED ,
162+ ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT );
155163 if (CURATOR_TL .get () != null ) {
156164 zkClient =
157165 CURATOR_TL .get ().usingNamespace (
@@ -253,8 +261,7 @@ public void startThreads() throws IOException {
253261 // If namespace parents are implicitly created, they won't have ACLs.
254262 // So, let's explicitly create them.
255263 CuratorFramework nullNsFw = zkClient .usingNamespace (null );
256- EnsurePath ensureNs =
257- nullNsFw .newNamespaceAwareEnsurePath ("/" + zkClient .getNamespace ());
264+ EnsurePath ensureNs = nullNsFw .newNamespaceAwareEnsurePath ("/" + zkClient .getNamespace ());
258265 try {
259266 ensureNs .ensure (nullNsFw .getZookeeperClient ());
260267 } catch (Exception e ) {
@@ -270,8 +277,7 @@ public void startThreads() throws IOException {
270277 // by calling the incrSharedCount
271278 currentSeqNum = incrSharedCount (delTokSeqCounter , seqNumBatchSize );
272279 currentMaxSeqNum = currentSeqNum + seqNumBatchSize ;
273- LOG .info ("Fetched initial range of seq num, from {} to {} " ,
274- currentSeqNum +1 , currentMaxSeqNum );
280+ LOG .info ("Fetched initial range of seq num, from {} to {} " , currentSeqNum + 1 , currentMaxSeqNum );
275281 } catch (Exception e ) {
276282 throw new IOException ("Could not start Sequence Counter" , e );
277283 }
@@ -290,60 +296,49 @@ public void startThreads() throws IOException {
290296 throw new RuntimeException ("Could not create ZK paths" );
291297 }
292298 try {
293- keyCache = CuratorCache .bridgeBuilder (zkClient , ZK_DTSM_MASTER_KEY_ROOT )
294- .build ();
295- CuratorCacheListener keyCacheListener = CuratorCacheListener .builder ()
296- .forCreatesAndChanges ((oldNode , node ) -> {
297- try {
298- processKeyAddOrUpdate (node .getData ());
299- } catch (IOException e ) {
300- LOG .error ("Error while processing Curator keyCacheListener "
301- + "NODE_CREATED / NODE_CHANGED event" );
302- throw new UncheckedIOException (e );
303- }
304- })
305- .forDeletes (childData -> processKeyRemoved (childData .getPath ()))
306- .build ();
299+ keyCache = CuratorCache .bridgeBuilder (zkClient , ZK_DTSM_MASTER_KEY_ROOT ).build ();
300+ CuratorCacheListener keyCacheListener = CuratorCacheListener .builder ().forCreatesAndChanges ((oldNode , node ) -> {
301+ try {
302+ processKeyAddOrUpdate (node .getData ());
303+ } catch (IOException e ) {
304+ LOG .error ("Error while processing Curator keyCacheListener " + "NODE_CREATED / NODE_CHANGED event" );
305+ throw new UncheckedIOException (e );
306+ }
307+ }).forDeletes (childData -> processKeyRemoved (childData .getPath ())).build ();
307308 keyCache .listenable ().addListener (keyCacheListener );
308309 keyCache .start ();
309310 loadFromZKCache (false );
310311 } catch (Exception e ) {
311- throw new IOException ("Could not start Curator keyCacheListener for keys" ,
312- e );
312+ throw new IOException ("Could not start Curator keyCacheListener for keys" , e );
313313 }
314- try {
315- tokenCache = CuratorCache .bridgeBuilder (zkClient , ZK_DTSM_TOKENS_ROOT )
316- .build ();
317- CuratorCacheListener tokenCacheListener = CuratorCacheListener .builder ()
318- .forCreatesAndChanges ((oldNode , node ) -> {
319- try {
320- processTokenAddOrUpdate (node .getData ());
321- } catch (IOException e ) {
322- LOG .error ("Error while processing Curator tokenCacheListener "
323- + "NODE_CREATED / NODE_CHANGED event" );
324- throw new UncheckedIOException (e );
325- }
326- })
327- .forDeletes (childData -> {
328- try {
329- processTokenRemoved (childData );
330- } catch (IOException e ) {
331- LOG .error ("Error while processing Curator tokenCacheListener "
332- + "NODE_DELETED event" );
333- throw new UncheckedIOException (e );
334- }
335- })
336- .build ();
337- tokenCache .listenable ().addListener (tokenCacheListener );
338- tokenCache .start ();
339- loadFromZKCache (true );
340- } catch (Exception e ) {
341- throw new IOException (
342- "Could not start Curator tokenCacheListener for tokens" , e );
314+ if (isTokenWatcherEnabled ) {
315+ LOG .info ("TokenCache is enabled" );
316+ try {
317+ tokenCache = CuratorCache .bridgeBuilder (zkClient , ZK_DTSM_TOKENS_ROOT ).build ();
318+ CuratorCacheListener tokenCacheListener = CuratorCacheListener .builder ().forCreatesAndChanges ((oldNode , node ) -> {
319+ try {
320+ processTokenAddOrUpdate (node .getData ());
321+ } catch (IOException e ) {
322+ LOG .error ("Error while processing Curator tokenCacheListener " + "NODE_CREATED / NODE_CHANGED event" );
323+ throw new UncheckedIOException (e );
324+ }
325+ }).forDeletes (childData -> {
326+ try {
327+ processTokenRemoved (childData );
328+ } catch (IOException e ) {
329+ LOG .error ("Error while processing Curator tokenCacheListener " + "NODE_DELETED event" );
330+ throw new UncheckedIOException (e );
331+ }
332+ }).build ();
333+ tokenCache .listenable ().addListener (tokenCacheListener );
334+ tokenCache .start ();
335+ loadFromZKCache (true );
336+ } catch (Exception e ) {
337+ throw new IOException ("Could not start Curator tokenCacheListener for tokens" , e );
338+ }
343339 }
344340 super .startThreads ();
345- }
346-
341+ }
347342 /**
348343 * Load the CuratorCache into the in-memory map. Possible caches to be
349344 * loaded are keyCache and tokenCache.
@@ -386,9 +381,7 @@ private void processKeyAddOrUpdate(byte[] data) throws IOException {
386381 DataInputStream din = new DataInputStream (bin );
387382 DelegationKey key = new DelegationKey ();
388383 key .readFields (din );
389- synchronized (this ) {
390- allKeys .put (key .getKeyId (), key );
391- }
384+ allKeys .put (key .getKeyId (), key );
392385 }
393386
394387 private void processKeyRemoved (String path ) {
@@ -398,14 +391,12 @@ private void processKeyRemoved(String path) {
398391 int j = tokSeg .indexOf ('_' );
399392 if (j > 0 ) {
400393 int keyId = Integer .parseInt (tokSeg .substring (j + 1 ));
401- synchronized (this ) {
402- allKeys .remove (keyId );
403- }
394+ allKeys .remove (keyId );
404395 }
405396 }
406397 }
407398
408- private void processTokenAddOrUpdate (byte [] data ) throws IOException {
399+ protected TokenIdent processTokenAddOrUpdate (byte [] data ) throws IOException {
409400 ByteArrayInputStream bin = new ByteArrayInputStream (data );
410401 DataInputStream din = new DataInputStream (bin );
411402 TokenIdent ident = createIdentifier ();
@@ -417,24 +408,18 @@ private void processTokenAddOrUpdate(byte[] data) throws IOException {
417408 if (numRead > -1 ) {
418409 DelegationTokenInformation tokenInfo =
419410 new DelegationTokenInformation (renewDate , password );
420- synchronized (this ) {
421- currentTokens .put (ident , tokenInfo );
422- // The cancel task might be waiting
423- notifyAll ();
424- }
411+ currentTokens .put (ident , tokenInfo );
412+ return ident ;
425413 }
414+ return null ;
426415 }
427416
428417 private void processTokenRemoved (ChildData data ) throws IOException {
429418 ByteArrayInputStream bin = new ByteArrayInputStream (data .getData ());
430419 DataInputStream din = new DataInputStream (bin );
431420 TokenIdent ident = createIdentifier ();
432421 ident .readFields (din );
433- synchronized (this ) {
434- currentTokens .remove (ident );
435- // The cancel task might be waiting
436- notifyAll ();
437- }
422+ currentTokens .remove (ident );
438423 }
439424
440425 @ Override
@@ -621,7 +606,7 @@ protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
621606 *
622607 * @param ident Identifier of the token
623608 */
624- private synchronized void syncLocalCacheWithZk (TokenIdent ident ) {
609+ protected void syncLocalCacheWithZk (TokenIdent ident ) {
625610 try {
626611 DelegationTokenInformation tokenInfo = getTokenInfoFromZK (ident );
627612 if (tokenInfo != null && !currentTokens .containsKey (ident )) {
@@ -635,16 +620,21 @@ private synchronized void syncLocalCacheWithZk(TokenIdent ident) {
635620 }
636621 }
637622
638- private DelegationTokenInformation getTokenInfoFromZK (TokenIdent ident )
623+ protected DelegationTokenInformation getTokenInfoFromZK (TokenIdent ident )
639624 throws IOException {
640625 return getTokenInfoFromZK (ident , false );
641626 }
642627
643- private DelegationTokenInformation getTokenInfoFromZK (TokenIdent ident ,
628+ protected DelegationTokenInformation getTokenInfoFromZK (TokenIdent ident ,
644629 boolean quiet ) throws IOException {
645630 String nodePath =
646631 getNodePath (ZK_DTSM_TOKENS_ROOT ,
647632 DELEGATION_TOKEN_PREFIX + ident .getSequenceNumber ());
633+ return getTokenInfoFromZK (nodePath , quiet );
634+ }
635+
636+ protected DelegationTokenInformation getTokenInfoFromZK (String nodePath ,
637+ boolean quiet ) throws IOException {
648638 try {
649639 byte [] data = zkClient .getData ().forPath (nodePath );
650640 if ((data == null ) || (data .length == 0 )) {
@@ -779,15 +769,30 @@ protected void updateToken(TokenIdent ident,
779769 @ Override
780770 protected void removeStoredToken (TokenIdent ident )
781771 throws IOException {
772+ removeStoredToken (ident , false );
773+ }
774+
775+ protected void removeStoredToken (TokenIdent ident ,
776+ boolean checkAgainstZkBeforeDeletion ) throws IOException {
782777 String nodeRemovePath =
783778 getNodePath (ZK_DTSM_TOKENS_ROOT , DELEGATION_TOKEN_PREFIX
784779 + ident .getSequenceNumber ());
785- if (LOG .isDebugEnabled ()) {
786- LOG .debug ("Removing ZKDTSMDelegationToken_"
787- + ident .getSequenceNumber ());
788- }
789780 try {
790- if (zkClient .checkExists ().forPath (nodeRemovePath ) != null ) {
781+ DelegationTokenInformation dtInfo = getTokenInfoFromZK (ident , true );
782+ if (dtInfo != null ) {
783+ // For the case there is no sync or watch miss, it is possible that the
784+ // local storage has expired tokens which have been renewed by peer
785+ // so double check again to avoid accidental delete
786+ if (checkAgainstZkBeforeDeletion
787+ && dtInfo .getRenewDate () > now ()) {
788+ LOG .info ("Node already renewed by peer " + nodeRemovePath +
789+ " so this token should not be deleted" );
790+ return ;
791+ }
792+ if (LOG .isDebugEnabled ()) {
793+ LOG .debug ("Removing ZKDTSMDelegationToken_"
794+ + ident .getSequenceNumber ());
795+ }
791796 while (zkClient .checkExists ().forPath (nodeRemovePath ) != null ){
792797 try {
793798 zkClient .delete ().guaranteed ().forPath (nodeRemovePath );
@@ -810,7 +815,7 @@ protected void removeStoredToken(TokenIdent ident)
810815 }
811816
812817 @ Override
813- public synchronized TokenIdent cancelToken (Token <TokenIdent > token ,
818+ public TokenIdent cancelToken (Token <TokenIdent > token ,
814819 String canceller ) throws IOException {
815820 ByteArrayInputStream buf = new ByteArrayInputStream (token .getIdentifier ());
816821 DataInputStream in = new DataInputStream (buf );
@@ -821,7 +826,7 @@ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
821826 return super .cancelToken (token , canceller );
822827 }
823828
824- private void addOrUpdateToken (TokenIdent ident ,
829+ protected void addOrUpdateToken (TokenIdent ident ,
825830 DelegationTokenInformation info , boolean isUpdate ) throws Exception {
826831 String nodeCreatePath =
827832 getNodePath (ZK_DTSM_TOKENS_ROOT , DELEGATION_TOKEN_PREFIX
@@ -848,6 +853,10 @@ private void addOrUpdateToken(TokenIdent ident,
848853 }
849854 }
850855
856+ public boolean isTokenWatcherEnabled () {
857+ return isTokenWatcherEnabled ;
858+ }
859+
851860 /**
852861 * Simple implementation of an {@link ACLProvider} that simply returns an ACL
853862 * that gives all permissions only to a single principal.
0 commit comments