diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java index 31b92dcf914e..4299a603ece9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java @@ -31,6 +31,7 @@ import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.BoundedExponentialBackoffRetry; import org.apache.hadoop.conf.Configuration; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,8 +75,48 @@ public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, fin .connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY, DEFAULT_ZK_CONNECTION_TIMEOUT_MS)) .build(); this.curatorFrameworkClient.start(); + createPathIfNotExists(); } + private String getLockPath() { + return lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/" + + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY); + } + + private void createPathIfNotExists() { + try { + String lockPath = getLockPath(); + LOG.info(String.format("Creating zookeeper path %s if not exists", lockPath)); + String[] parts = lockPath.split("/"); + StringBuilder currentPath = new StringBuilder(); + for (String part : parts) { + if (!part.isEmpty()) { + currentPath.append("/").append(part); + createNodeIfNotExists(currentPath.toString()); + } + } + } catch (Exception e) { + LOG.error("Failed to create ZooKeeper path: " + e.getMessage()); + throw new HoodieLockException("Failed to initialize ZooKeeper path", e); + } + } + + private void createNodeIfNotExists(String path) throws Exception { + if (this.curatorFrameworkClient.checkExists().forPath(path) == null) { + try { + this.curatorFrameworkClient.create().forPath(path); + // to avoid failure due to synchronous calls. + } catch (KeeperException e) { + if (e.code() == KeeperException.Code.NODEEXISTS) { + LOG.debug(String.format("Node already exist for path = %s", path)); + } else { + throw new HoodieLockException("Failed to create zookeeper node", e); + } + } + } + } + + // Only used for testing public ZookeeperBasedLockProvider( final LockConfiguration lockConfiguration, final CuratorFramework curatorFrameworkClient) { @@ -85,6 +126,7 @@ public ZookeeperBasedLockProvider( synchronized (this.curatorFrameworkClient) { if (this.curatorFrameworkClient.getState() != CuratorFrameworkState.STARTED) { this.curatorFrameworkClient.start(); + createPathIfNotExists(); } } }