Skip to content

Commit

Permalink
HADOOP-18922. Race condition in ZKDelegationTokenSecretManager creati…
Browse files Browse the repository at this point in the history
…ng znode (apache#6150). Contributed by Kevin Risden. (apache#6179)

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
  • Loading branch information
risdenk authored Oct 17, 2023
1 parent 553b8ff commit 78fc23e
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -268,10 +267,9 @@ public void startThreads() throws IOException {
CuratorFramework nullNsFw = zkClient.usingNamespace(null);
try {
String nameSpace = "/" + zkClient.getNamespace();
Stat stat = nullNsFw.checkExists().forPath(nameSpace);
if (stat == null) {
nullNsFw.create().creatingParentContainersIfNeeded().forPath(nameSpace);
}
nullNsFw.create().creatingParentContainersIfNeeded().forPath(nameSpace);
} catch (KeeperException.NodeExistsException ignore) {
// We don't care if the znode already exists
} catch (Exception e) {
throw new IOException("Could not create namespace", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
Expand Down Expand Up @@ -572,4 +578,53 @@ public void testCreateNameSpaceRepeatedly() throws Exception {
"KeeperErrorCode = NodeExists for "+workingPath,
() -> createModeStat.forPath(workingPath));
}

@Test
public void testMultipleInit() throws Exception {

String connectString = zkServer.getConnectString();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
Configuration conf = getSecretConf(connectString);
CuratorFramework curatorFramework =
CuratorFrameworkFactory.builder()
.connectString(connectString)
.retryPolicy(retryPolicy)
.build();
curatorFramework.start();
ZKDelegationTokenSecretManager.setCurator(curatorFramework);

DelegationTokenManager tm1 = new DelegationTokenManager(conf, new Text("foo"));
DelegationTokenManager tm2 = new DelegationTokenManager(conf, new Text("bar"));
// When the init method is called,
// the ZKDelegationTokenSecretManager#startThread method will be called,
// and the creatingParentContainersIfNeeded will be called to create the nameSpace.
ExecutorService executorService = Executors.newFixedThreadPool(2);

Callable<Boolean> tm1Callable = () -> {
tm1.init();
return true;
};
Callable<Boolean> tm2Callable = () -> {
tm2.init();
return true;
};
List<Future<Boolean>> futures = executorService.invokeAll(
Arrays.asList(tm1Callable, tm2Callable));
for(Future<Boolean> future : futures) {
Assert.assertTrue(future.get());
}
executorService.shutdownNow();
Assert.assertTrue(executorService.awaitTermination(1, TimeUnit.SECONDS));
tm1.destroy();
tm2.destroy();

String workingPath = "/" + conf.get(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH,
ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT) + "/ZKDTSMRoot";

// Check if the created NameSpace exists.
Stat stat = curatorFramework.checkExists().forPath(workingPath);
Assert.assertNotNull(stat);

curatorFramework.close();
}
}

0 comments on commit 78fc23e

Please sign in to comment.