Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7198]Create nested node path if does not exist for zookeeper. #10438

Merged
merged 1 commit into from
Jan 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,8 +75,48 @@ public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, fin
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY, DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
.build();
this.curatorFrameworkClient.start();
createPathIfNotExists();
}

private String getLockPath() {
return lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/"
+ this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
}

private void createPathIfNotExists() {
try {
String lockPath = getLockPath();
LOG.info(String.format("Creating zookeeper path %s if not exists", lockPath));
String[] parts = lockPath.split("/");
StringBuilder currentPath = new StringBuilder();
for (String part : parts) {
if (!part.isEmpty()) {
currentPath.append("/").append(part);
createNodeIfNotExists(currentPath.toString());
}
}
} catch (Exception e) {
LOG.error("Failed to create ZooKeeper path: " + e.getMessage());
throw new HoodieLockException("Failed to initialize ZooKeeper path", e);
}
}

private void createNodeIfNotExists(String path) throws Exception {
if (this.curatorFrameworkClient.checkExists().forPath(path) == null) {
try {
this.curatorFrameworkClient.create().forPath(path);
// to avoid failure due to synchronous calls.
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.NODEEXISTS) {
LOG.debug(String.format("Node already exist for path = %s", path));
} else {
throw new HoodieLockException("Failed to create zookeeper node", e);
}
}
}
}


// Only used for testing
public ZookeeperBasedLockProvider(
final LockConfiguration lockConfiguration, final CuratorFramework curatorFrameworkClient) {
Expand All @@ -85,6 +126,7 @@ public ZookeeperBasedLockProvider(
synchronized (this.curatorFrameworkClient) {
if (this.curatorFrameworkClient.getState() != CuratorFrameworkState.STARTED) {
this.curatorFrameworkClient.start();
createPathIfNotExists();
}
}
}
Expand Down
Loading