Skip to content

HADOOP-16579. Upgrade to Curator 4.2.0 and ZooKeeper 3.5.5 #1656

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -36,7 +35,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -368,7 +367,7 @@ protected CuratorFramework createCuratorClient(Properties config)
LOG.info("Connecting to ZooKeeper with SASL/Kerberos"
+ "and using 'sasl' ACLs");
String principal = setJaasConfiguration(config);
System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
System.setProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY,
JAAS_LOGIN_ENTRY_NAME);
System.setProperty("zookeeper.authProvider.1",
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,14 +442,16 @@ private void recordActiveAttempt(
* </ul>
*
* @param timeoutMillis number of millis to wait
* @param onlyAfterNanoTime accept attempt records only after a given
* timestamp. Use this parameter to ignore the old attempt records from a
* previous fail-over attempt.
* @return the published record, or null if the timeout elapses or the
* service becomes unhealthy
* @throws InterruptedException if the thread is interrupted.
*/
private ActiveAttemptRecord waitForActiveAttempt(int timeoutMillis)
throws InterruptedException {
long st = System.nanoTime();
long waitUntil = st + TimeUnit.NANOSECONDS.convert(
private ActiveAttemptRecord waitForActiveAttempt(int timeoutMillis,
long onlyAfterNanoTime) throws InterruptedException {
long waitUntil = onlyAfterNanoTime + TimeUnit.NANOSECONDS.convert(
timeoutMillis, TimeUnit.MILLISECONDS);

do {
Expand All @@ -466,7 +468,7 @@ private ActiveAttemptRecord waitForActiveAttempt(int timeoutMillis)

synchronized (activeAttemptRecordLock) {
if ((lastActiveAttemptRecord != null &&
lastActiveAttemptRecord.nanoTime >= st)) {
lastActiveAttemptRecord.nanoTime >= onlyAfterNanoTime)) {
return lastActiveAttemptRecord;
}
// Only wait 1sec so that we periodically recheck the health state
Expand Down Expand Up @@ -660,6 +662,7 @@ private void doGracefulFailover()
List<ZKFCProtocol> otherZkfcs = new ArrayList<ZKFCProtocol>(otherNodes.size());

// Phase 3: ask the other nodes to yield from the election.
long st = System.nanoTime();
HAServiceTarget activeNode = null;
for (HAServiceTarget remote : otherNodes) {
// same location, same node - may not always be == equality
Expand All @@ -678,7 +681,7 @@ private void doGracefulFailover()

// Phase 4: wait for the normal election to make the local node
// active.
ActiveAttemptRecord attempt = waitForActiveAttempt(timeout + 60000);
ActiveAttemptRecord attempt = waitForActiveAttempt(timeout + 60000, st);

if (attempt == null) {
// We didn't even make an attempt to become active.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
Expand Down Expand Up @@ -173,8 +173,8 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
LOG.info("Connecting to ZooKeeper with SASL/Kerberos"
+ "and using 'sasl' ACLs");
String principal = setJaasConfiguration(conf);
System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
JAAS_LOGIN_ENTRY_NAME);
System.setProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY,
JAAS_LOGIN_ENTRY_NAME);
System.setProperty("zookeeper.authProvider.1",
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
aclProvider = new SASLOwnerACLProvider(principal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -387,43 +387,45 @@ public SafeTransaction createTransaction(List<ACL> fencingACL,
/**
* Use curator transactions to ensure zk-operations are performed in an all
* or nothing fashion. This is equivalent to using ZooKeeper#multi.
*
* TODO (YARN-3774): Curator 3.0 introduces CuratorOp similar to Op. We ll
* have to rewrite this inner class when we adopt that.
*/
public class SafeTransaction {
private CuratorTransactionFinal transactionFinal;
private String fencingNodePath;
private List<CuratorOp> curatorOperations = new LinkedList<>();

SafeTransaction(List<ACL> fencingACL, String fencingNodePath)
throws Exception {
this.fencingNodePath = fencingNodePath;
CuratorTransaction transaction = curator.inTransaction();
transactionFinal = transaction.create()
.withMode(CreateMode.PERSISTENT).withACL(fencingACL)
.forPath(fencingNodePath, new byte[0]).and();
curatorOperations.add(curator.transactionOp().create()
.withMode(CreateMode.PERSISTENT)
.withACL(fencingACL)
.forPath(fencingNodePath, new byte[0]));
}

public void commit() throws Exception {
transactionFinal = transactionFinal.delete()
.forPath(fencingNodePath).and();
transactionFinal.commit();
curatorOperations.add(curator.transactionOp().delete()
.forPath(fencingNodePath));
curator.transaction().forOperations(curatorOperations);
curatorOperations.clear();
}

public void create(String path, byte[] data, List<ACL> acl, CreateMode mode)
throws Exception {
transactionFinal = transactionFinal.create()
.withMode(mode).withACL(acl).forPath(path, data).and();
curatorOperations.add(curator.transactionOp().create()
.withMode(mode)
.withACL(acl)
.forPath(path, data));
}

public void delete(String path) throws Exception {
transactionFinal = transactionFinal.delete().forPath(path).and();
curatorOperations.add(curator.transactionOp().delete()
.forPath(path));
}

public void setData(String path, byte[] data, int version)
throws Exception {
transactionFinal = transactionFinal.setData()
.withVersion(version).forPath(path, data).and();
curatorOperations.add(curator.transactionOp().setData()
.withVersion(version)
.forPath(path, data));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
public static int CONNECTION_TIMEOUT = 30000;
static final File BASETEST = GenericTestUtils.getTestDir();

static {
// The 4-letter-words commands are simple diagnostics telnet commands in
// ZooKeeper. Since ZooKeeper 3.5, these are disabled by default due to
// security concerns: https://issues.apache.org/jira/browse/ZOOKEEPER-2693
// We are enabling them for the tests here, as some tests in hadoop or in
// other projects might still use them
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
}

protected final String hostPort = initHostPort();
protected int maxCnxns = 0;
protected ServerCnxnFactory serverFactory = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.util.ZKUtil;
import org.apache.zookeeper.Environment;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
Expand Down Expand Up @@ -769,19 +770,19 @@ public void applySecurityEnvironment(CuratorFrameworkFactory.Builder
JaasConfiguration jconf =
new JaasConfiguration(jaasClientEntry, principal, keytab);
javax.security.auth.login.Configuration.setConfiguration(jconf);
setSystemPropertyIfUnset(ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY,
"true");
setSystemPropertyIfUnset(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
jaasClientEntry);
setSystemPropertyIfUnset(ZKClientConfig.ENABLE_CLIENT_SASL_KEY,
"true");
setSystemPropertyIfUnset(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY,
jaasClientEntry);
} else {
// in this case, jaas config is specified so we will not change it
LOG.info("Using existing ZK sasl configuration: " +
"jaasClientEntry = " + System.getProperty(
ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") +
", sasl client = " + System.getProperty(
ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY,
ZooKeeperSaslClient.ENABLE_CLIENT_SASL_DEFAULT) +
", jaas = " + existingJaasConf);
"jaasClientEntry = " + System.getProperty(
ZKClientConfig.LOGIN_CONTEXT_NAME_KEY, "Client") +
", sasl client = " + System.getProperty(
ZKClientConfig.ENABLE_CLIENT_SASL_KEY,
ZKClientConfig.ENABLE_CLIENT_SASL_DEFAULT) +
", jaas = " + existingJaasConf);
}
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hadoop.registry.client.impl.zk;

import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.server.ZooKeeperSaslServer;

/**
Expand Down Expand Up @@ -62,10 +62,10 @@ public interface ZookeeperConfigOptions {
*
* <p>
* Default value is derived from
* {@link ZooKeeperSaslClient#LOGIN_CONTEXT_NAME_KEY}
* {@link ZKClientConfig#LOGIN_CONTEXT_NAME_KEY}
*/
String PROP_ZK_SASL_CLIENT_CONTEXT =
ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY;
ZKClientConfig.LOGIN_CONTEXT_NAME_KEY;

/**
* The SASL client username: {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;

/**
Expand Down Expand Up @@ -121,7 +122,7 @@ public InetSocketAddress getConnectionAddress() {
* @throws UnknownHostException if the server cannot resolve the host
*/
private InetSocketAddress getAddress(int port) throws UnknownHostException {
return new InetSocketAddress(host, port < 0 ? 0 : port);
return new InetSocketAddress(host, port <= 0 ? getRandomAvailablePort() : port);
}

/**
Expand Down Expand Up @@ -227,10 +228,8 @@ protected void serviceStart() throws Exception {

setupSecurity();

ZooKeeperServer zkServer = new ZooKeeperServer();
FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir);
zkServer.setTxnLogFactory(ftxn);
zkServer.setTickTime(tickTime);
ZooKeeperServer zkServer = new ZooKeeperServer(ftxn, tickTime);

LOG.info("Starting Local Zookeeper service");
factory = ServerCnxnFactory.createFactory();
Expand All @@ -245,7 +244,7 @@ protected void serviceStart() throws Exception {
PrintWriter pw = new PrintWriter(sw);
zkServer.dumpConf(pw);
pw.flush();
LOG.debug(sw.toString());
LOG.debug("ZooKeeper config:\n" + sw.toString());
}
binding = new BindingInformation();
binding.ensembleProvider = new FixedEnsembleProvider(connectString);
Expand Down Expand Up @@ -279,4 +278,20 @@ public BindingInformation supplyBindingInformation() {
"Service is not started: binding information undefined");
return binding;
}

/**
* Returns with a random open port can be used to set as server port for ZooKeeper.
* @return a random open port or 0 (in case of error)
*/
private int getRandomAvailablePort() {
port = 0;
try {
final ServerSocket s = new ServerSocket(0);
port = s.getLocalPort();
s.close();
} catch (IOException e) {
LOG.warn("ERROR during selecting random port for ZooKeeper server to bind." , e);
}
return port;
}
}
Loading