5656import org .apache .hadoop .security .AbstractAuthTokenDelegationManager ;
5757import org .apache .hadoop .security .token .Token ;
5858import org .apache .hadoop .security .token .delegation .web .DelegationTokenManager ;
59+ import static org .apache .hadoop .util .Time .now ;
5960import org .apache .zookeeper .CreateMode ;
6061import org .apache .zookeeper .KeeperException ;
6162import org .apache .zookeeper .KeeperException .NoNodeException ;
8081public abstract class ZKDelegationTokenSecretManager <TokenIdent extends AbstractDelegationTokenIdentifier >
8182 extends AbstractDelegationTokenSecretManager <TokenIdent > {
8283
83- private static final String ZK_CONF_PREFIX = "zk-dt-secret-manager." ;
84+ public static final String ZK_CONF_PREFIX = "zk-dt-secret-manager." ;
8485 public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX
8586 + "zkNumRetries" ;
8687 public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX
@@ -99,6 +100,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
99100 + "kerberos.keytab" ;
100101 public static final String ZK_DTSM_ZK_KERBEROS_PRINCIPAL = ZK_CONF_PREFIX
101102 + "kerberos.principal" ;
103+ public static final String ZK_DTSM_TOKEN_WATCHER_ENABLED = ZK_CONF_PREFIX
104+ + "token.watcher.enabled" ;
105+ public static final boolean ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT = true ;
102106
103107 public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3 ;
104108 public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000 ;
@@ -115,7 +119,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
115119 private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot" ;
116120 private static final String ZK_DTSM_SEQNUM_ROOT = "/ZKDTSMSeqNumRoot" ;
117121 private static final String ZK_DTSM_KEYID_ROOT = "/ZKDTSMKeyIdRoot" ;
118- private static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot" ;
122+ protected static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot" ;
119123 private static final String ZK_DTSM_MASTER_KEY_ROOT = "/ZKDTSMMasterKeyRoot" ;
120124
121125 private static final String DELEGATION_KEY_PREFIX = "DK_" ;
@@ -129,14 +133,16 @@ public static void setCurator(CuratorFramework curator) {
129133 }
130134
131135 private final boolean isExternalClient ;
132- private final CuratorFramework zkClient ;
136+ protected final CuratorFramework zkClient ;
133137 private SharedCount delTokSeqCounter ;
134138 private SharedCount keyIdSeqCounter ;
135139 private PathChildrenCache keyCache ;
136140 private PathChildrenCache tokenCache ;
137141 private ExecutorService listenerThreadPool ;
138142 private final long shutdownTimeout ;
139143
144+ private final boolean isTokenWatcherEnabled ;
145+
140146 public ZKDelegationTokenSecretManager (Configuration conf ) {
141147 super (conf .getLong (DelegationTokenManager .UPDATE_INTERVAL ,
142148 DelegationTokenManager .UPDATE_INTERVAL_DEFAULT ) * 1000 ,
@@ -148,6 +154,8 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
148154 DelegationTokenManager .REMOVAL_SCAN_INTERVAL_DEFAULT ) * 1000 );
149155 shutdownTimeout = conf .getLong (ZK_DTSM_ZK_SHUTDOWN_TIMEOUT ,
150156 ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT );
157+ isTokenWatcherEnabled = conf .getBoolean (ZK_DTSM_TOKEN_WATCHER_ENABLED ,
158+ ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT );
151159 if (CURATOR_TL .get () != null ) {
152160 zkClient =
153161 CURATOR_TL .get ().usingNamespace (
@@ -369,34 +377,37 @@ public void childEvent(CuratorFramework client,
369377 } catch (Exception e ) {
370378 throw new IOException ("Could not start PathChildrenCache for keys" , e );
371379 }
372- try {
373- tokenCache = new PathChildrenCache (zkClient , ZK_DTSM_TOKENS_ROOT , true );
374- if (tokenCache != null ) {
375- tokenCache .start (StartMode .BUILD_INITIAL_CACHE );
376- tokenCache .getListenable ().addListener (new PathChildrenCacheListener () {
377-
378- @ Override
379- public void childEvent (CuratorFramework client ,
380- PathChildrenCacheEvent event ) throws Exception {
381- switch (event .getType ()) {
382- case CHILD_ADDED :
383- processTokenAddOrUpdate (event .getData ());
384- break ;
385- case CHILD_UPDATED :
386- processTokenAddOrUpdate (event .getData ());
387- break ;
388- case CHILD_REMOVED :
389- processTokenRemoved (event .getData ());
390- break ;
391- default :
392- break ;
380+ if (isTokenWatcherEnabled ) {
381+ LOG .info ("TokenCache is enabled" );
382+ try {
383+ tokenCache = new PathChildrenCache (zkClient , ZK_DTSM_TOKENS_ROOT , true );
384+ if (tokenCache != null ) {
385+ tokenCache .start (StartMode .BUILD_INITIAL_CACHE );
386+ tokenCache .getListenable ().addListener (new PathChildrenCacheListener () {
387+
388+ @ Override
389+ public void childEvent (CuratorFramework client ,
390+ PathChildrenCacheEvent event ) throws Exception {
391+ switch (event .getType ()) {
392+ case CHILD_ADDED :
393+ processTokenAddOrUpdate (event .getData ().getData ());
394+ break ;
395+ case CHILD_UPDATED :
396+ processTokenAddOrUpdate (event .getData ().getData ());
397+ break ;
398+ case CHILD_REMOVED :
399+ processTokenRemoved (event .getData ());
400+ break ;
401+ default :
402+ break ;
403+ }
393404 }
394- }
395- }, listenerThreadPool );
396- loadFromZKCache (true );
405+ }, listenerThreadPool );
406+ loadFromZKCache (true );
407+ }
408+ } catch (Exception e ) {
409+ throw new IOException ("Could not start PathChildrenCache for tokens" , e );
397410 }
398- } catch (Exception e ) {
399- throw new IOException ("Could not start PathChildrenCache for tokens" , e );
400411 }
401412 super .startThreads ();
402413 }
@@ -421,7 +432,7 @@ private void loadFromZKCache(final boolean isTokenCache) {
421432 for (ChildData child : children ) {
422433 try {
423434 if (isTokenCache ) {
424- processTokenAddOrUpdate (child );
435+ processTokenAddOrUpdate (child . getData () );
425436 } else {
426437 processKeyAddOrUpdate (child .getData ());
427438 }
@@ -443,9 +454,7 @@ private void processKeyAddOrUpdate(byte[] data) throws IOException {
443454 DataInputStream din = new DataInputStream (bin );
444455 DelegationKey key = new DelegationKey ();
445456 key .readFields (din );
446- synchronized (this ) {
447- allKeys .put (key .getKeyId (), key );
448- }
457+ allKeys .put (key .getKeyId (), key );
449458 }
450459
451460 private void processKeyRemoved (String path ) {
@@ -455,15 +464,13 @@ private void processKeyRemoved(String path) {
455464 int j = tokSeg .indexOf ('_' );
456465 if (j > 0 ) {
457466 int keyId = Integer .parseInt (tokSeg .substring (j + 1 ));
458- synchronized (this ) {
459- allKeys .remove (keyId );
460- }
467+ allKeys .remove (keyId );
461468 }
462469 }
463470 }
464471
465- private void processTokenAddOrUpdate (ChildData data ) throws IOException {
466- ByteArrayInputStream bin = new ByteArrayInputStream (data . getData () );
472+ protected TokenIdent processTokenAddOrUpdate (byte [] data ) throws IOException {
473+ ByteArrayInputStream bin = new ByteArrayInputStream (data );
467474 DataInputStream din = new DataInputStream (bin );
468475 TokenIdent ident = createIdentifier ();
469476 ident .readFields (din );
@@ -474,24 +481,18 @@ private void processTokenAddOrUpdate(ChildData data) throws IOException {
474481 if (numRead > -1 ) {
475482 DelegationTokenInformation tokenInfo =
476483 new DelegationTokenInformation (renewDate , password );
477- synchronized (this ) {
478- currentTokens .put (ident , tokenInfo );
479- // The cancel task might be waiting
480- notifyAll ();
481- }
484+ currentTokens .put (ident , tokenInfo );
485+ return ident ;
482486 }
487+ return null ;
483488 }
484489
485490 private void processTokenRemoved (ChildData data ) throws IOException {
486491 ByteArrayInputStream bin = new ByteArrayInputStream (data .getData ());
487492 DataInputStream din = new DataInputStream (bin );
488493 TokenIdent ident = createIdentifier ();
489494 ident .readFields (din );
490- synchronized (this ) {
491- currentTokens .remove (ident );
492- // The cancel task might be waiting
493- notifyAll ();
494- }
495+ currentTokens .remove (ident );
495496 }
496497
497498 @ Override
@@ -686,7 +687,7 @@ protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
686687 *
687688 * @param ident Identifier of the token
688689 */
689- private synchronized void syncLocalCacheWithZk (TokenIdent ident ) {
690+ protected void syncLocalCacheWithZk (TokenIdent ident ) {
690691 try {
691692 DelegationTokenInformation tokenInfo = getTokenInfoFromZK (ident );
692693 if (tokenInfo != null && !currentTokens .containsKey (ident )) {
@@ -700,16 +701,21 @@ private synchronized void syncLocalCacheWithZk(TokenIdent ident) {
700701 }
701702 }
702703
703- private DelegationTokenInformation getTokenInfoFromZK (TokenIdent ident )
704+ protected DelegationTokenInformation getTokenInfoFromZK (TokenIdent ident )
704705 throws IOException {
705706 return getTokenInfoFromZK (ident , false );
706707 }
707708
708- private DelegationTokenInformation getTokenInfoFromZK (TokenIdent ident ,
709+ protected DelegationTokenInformation getTokenInfoFromZK (TokenIdent ident ,
709710 boolean quiet ) throws IOException {
710711 String nodePath =
711712 getNodePath (ZK_DTSM_TOKENS_ROOT ,
712713 DELEGATION_TOKEN_PREFIX + ident .getSequenceNumber ());
714+ return getTokenInfoFromZK (nodePath , quiet );
715+ }
716+
717+ protected DelegationTokenInformation getTokenInfoFromZK (String nodePath ,
718+ boolean quiet ) throws IOException {
713719 try {
714720 byte [] data = zkClient .getData ().forPath (nodePath );
715721 if ((data == null ) || (data .length == 0 )) {
@@ -848,15 +854,30 @@ protected void updateToken(TokenIdent ident,
848854 @ Override
849855 protected void removeStoredToken (TokenIdent ident )
850856 throws IOException {
857+ removeStoredToken (ident , false );
858+ }
859+
860+ protected void removeStoredToken (TokenIdent ident ,
861+ boolean checkAgainstZkBeforeDeletion ) throws IOException {
851862 String nodeRemovePath =
852863 getNodePath (ZK_DTSM_TOKENS_ROOT , DELEGATION_TOKEN_PREFIX
853864 + ident .getSequenceNumber ());
854- if (LOG .isDebugEnabled ()) {
855- LOG .debug ("Removing ZKDTSMDelegationToken_"
856- + ident .getSequenceNumber ());
857- }
858865 try {
859- if (zkClient .checkExists ().forPath (nodeRemovePath ) != null ) {
866+ DelegationTokenInformation dtInfo = getTokenInfoFromZK (ident , true );
867+ if (dtInfo != null ) {
868+ // For the case there is no sync or watch miss, it is possible that the
869+ // local storage has expired tokens which have been renewed by peer
870+ // so double check again to avoid accidental delete
871+ if (checkAgainstZkBeforeDeletion
872+ && dtInfo .getRenewDate () > now ()) {
873+ LOG .info ("Node already renewed by peer " + nodeRemovePath +
874+ " so this token should not be deleted" );
875+ return ;
876+ }
877+ if (LOG .isDebugEnabled ()) {
878+ LOG .debug ("Removing ZKDTSMDelegationToken_"
879+ + ident .getSequenceNumber ());
880+ }
860881 while (zkClient .checkExists ().forPath (nodeRemovePath ) != null ){
861882 try {
862883 zkClient .delete ().guaranteed ().forPath (nodeRemovePath );
@@ -882,7 +903,7 @@ protected void removeStoredToken(TokenIdent ident)
882903 }
883904
884905 @ Override
885- public synchronized TokenIdent cancelToken (Token <TokenIdent > token ,
906+ public TokenIdent cancelToken (Token <TokenIdent > token ,
886907 String canceller ) throws IOException {
887908 ByteArrayInputStream buf = new ByteArrayInputStream (token .getIdentifier ());
888909 DataInputStream in = new DataInputStream (buf );
@@ -893,7 +914,7 @@ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
893914 return super .cancelToken (token , canceller );
894915 }
895916
896- private void addOrUpdateToken (TokenIdent ident ,
917+ protected void addOrUpdateToken (TokenIdent ident ,
897918 DelegationTokenInformation info , boolean isUpdate ) throws Exception {
898919 String nodeCreatePath =
899920 getNodePath (ZK_DTSM_TOKENS_ROOT , DELEGATION_TOKEN_PREFIX
@@ -920,6 +941,10 @@ private void addOrUpdateToken(TokenIdent ident,
920941 }
921942 }
922943
944+ public boolean isTokenWatcherEnabled () {
945+ return isTokenWatcherEnabled ;
946+ }
947+
923948 /**
924949 * Simple implementation of an {@link ACLProvider} that simply returns an ACL
925950 * that gives all permissions only to a single principal.
0 commit comments