Skip to content

Commit

Permalink
Create nested node path if does not exist for zookeeper.
Browse files Browse the repository at this point in the history
Catch KeeperException if node already exist.
  • Loading branch information
harsh1231 committed Jan 3, 2024
1 parent 12c2634 commit 8624ba5
Showing 1 changed file with 42 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,8 +75,48 @@ public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, fin
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY, DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
.build();
this.curatorFrameworkClient.start();
createPathIfNotExists();
}

private String getLockPath() {
return lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/"
+ this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
}

private void createPathIfNotExists() {
try {
String lockPath = getLockPath();
LOG.info(String.format("Creating zookeeper path %s if not exists", lockPath));
String[] parts = lockPath.split("/");
StringBuilder currentPath = new StringBuilder();
for (String part : parts) {
if (!part.isEmpty()) {
currentPath.append("/").append(part);
createNodeIfNotExists(currentPath.toString());
}
}
} catch (Exception e) {
LOG.error("Failed to create ZooKeeper path: " + e.getMessage());
throw new HoodieLockException("Failed to initialize ZooKeeper path", e);
}
}

private void createNodeIfNotExists(String path) throws Exception {
if (this.curatorFrameworkClient.checkExists().forPath(path) == null) {
try {
this.curatorFrameworkClient.create().forPath(path);
// to avoid failure due to synchronous calls.
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.NODEEXISTS) {
LOG.debug(String.format("Node already exist for path = %s", path));
} else {
throw new HoodieLockException("Failed to create zookeeper node", e);
}
}
}
}


// Only used for testing
public ZookeeperBasedLockProvider(
final LockConfiguration lockConfiguration, final CuratorFramework curatorFrameworkClient) {
Expand All @@ -85,6 +126,7 @@ public ZookeeperBasedLockProvider(
synchronized (this.curatorFrameworkClient) {
if (this.curatorFrameworkClient.getState() != CuratorFrameworkState.STARTED) {
this.curatorFrameworkClient.start();
createPathIfNotExists();
}
}
}
Expand Down

0 comments on commit 8624ba5

Please sign in to comment.