diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala index 008f553231a..31c40b58a2e 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala @@ -42,18 +42,18 @@ abstract class ShareLevelSparkEngineSuite } test("check discovery service is clean up with different share level") { - withZkClient { zkClient => + withDiscoveryClient { discoveryClient => assert(engine.getServiceState == ServiceState.STARTED) - assert(zkClient.checkExists().forPath(namespace) != null) + assert(discoveryClient.pathExists(namespace)) withJdbcStatement() { _ => } shareLevel match { // Connection level, we will cleanup namespace since it's always a global unique value. case ShareLevel.CONNECTION => assert(engine.getServiceState == ServiceState.STOPPED) - assert(zkClient.checkExists().forPath(namespace) == null) + assert(discoveryClient.pathNonExists(namespace)) case _ => assert(engine.getServiceState == ServiceState.STARTED) - assert(zkClient.checkExists().forPath(namespace) != null) + assert(discoveryClient.pathExists(namespace)) } } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala index 481b8ec7e7f..dc3cff7e3b4 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala @@ -17,12 +17,12 @@ package org.apache.kyuubi.engine.spark -import org.apache.curator.framework.CuratorFramework - import org.apache.kyuubi.Utils import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_AUTH_TYPE, HA_ZK_NAMESPACE, HA_ZK_QUORUM} -import org.apache.kyuubi.ha.client.{ZooKeeperAuthTypes, ZooKeeperClientProvider} +import org.apache.kyuubi.ha.client.AuthTypes +import org.apache.kyuubi.ha.client.DiscoveryClientProvider +import org.apache.kyuubi.ha.client.zookeeper.DiscoveryClient import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf} trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine { @@ -32,7 +32,7 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine { assert(zkServer != null) Map( HA_ZK_QUORUM.key -> zkServer.getConnectString, - HA_ZK_AUTH_TYPE.key -> ZooKeeperAuthTypes.NONE.toString, + HA_ZK_AUTH_TYPE.key -> AuthTypes.NONE.toString, HA_ZK_NAMESPACE.key -> namespace) } @@ -62,8 +62,8 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine { stopSparkEngine() } - def withZkClient(f: CuratorFramework => Unit): Unit = { - ZooKeeperClientProvider.withZkClient(kyuubiConf)(f) + def withDiscoveryClient(f: DiscoveryClient => Unit): Unit = { + DiscoveryClientProvider.withDiscoveryClient(kyuubiConf)(f) } protected def getDiscoveryConnectionString: String = { diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala index 5dc4fe8ad83..01064421571 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala @@ -19,15 +19,13 @@ package org.apache.kyuubi.ctl import scala.collection.mutable.ListBuffer -import org.apache.curator.framework.CuratorFramework -import org.apache.curator.utils.ZKPaths - import org.apache.kyuubi.Logging import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN import org.apache.kyuubi.config.KyuubiConf.ENGINE_TYPE import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.ha.HighAvailabilityConf._ -import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider} +import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, ServiceNodeInfo} +import org.apache.kyuubi.ha.client.zookeeper.DiscoveryClient private[ctl] object ServiceControlAction extends Enumeration { type ServiceControlAction = Value @@ -43,9 +41,8 @@ private[ctl] object ServiceControlObject extends Enumeration { * Main gateway of launching a Kyuubi Ctl action. */ private[kyuubi] class ServiceControlCli extends Logging { + import DiscoveryClientProvider._ import ServiceControlCli._ - import ServiceDiscovery._ - import ZooKeeperClientProvider._ private var verbose: Boolean = false @@ -84,21 +81,20 @@ private[kyuubi] class ServiceControlCli extends Logging { val kyuubiConf = args.conf kyuubiConf.setIfMissing(HA_ZK_QUORUM, args.cliArgs.zkQuorum) - withZkClient(kyuubiConf) { zkClient => - val fromNamespace = ZKPaths.makePath(null, kyuubiConf.get(HA_ZK_NAMESPACE)) + withDiscoveryClient(kyuubiConf) { discoveryClient => + val fromNamespace = DiscoveryClient.makePath(null, kyuubiConf.get(HA_ZK_NAMESPACE)) val toNamespace = getZkNamespace(args) - val currentServerNodes = getServiceNodesInfo(zkClient, fromNamespace) + val currentServerNodes = discoveryClient.getServiceNodesInfo(fromNamespace) val exposedServiceNodes = ListBuffer[ServiceNodeInfo]() if (currentServerNodes.nonEmpty) { - def doCreate(zc: CuratorFramework): Unit = { + def doCreate(zc: DiscoveryClient): Unit = { currentServerNodes.foreach { sn => info(s"Exposing server instance:${sn.instance} with version:${sn.version}" + s" from $fromNamespace to $toNamespace") - val newNodePath = createAndGetServiceNode( + val newNodePath = zc.createAndGetServiceNode( kyuubiConf, - zc, args.cliArgs.namespace, sn.instance, sn.version, @@ -110,10 +106,10 @@ private[kyuubi] class ServiceControlCli extends Logging { } if (kyuubiConf.get(HA_ZK_QUORUM) == args.cliArgs.zkQuorum) { - doCreate(zkClient) + doCreate(discoveryClient) } else { kyuubiConf.set(HA_ZK_QUORUM, args.cliArgs.zkQuorum) - withZkClient(kyuubiConf)(doCreate) + withDiscoveryClient(kyuubiConf)(doCreate) } } @@ -126,13 +122,13 @@ private[kyuubi] class ServiceControlCli extends Logging { * List Kyuubi server nodes info. */ private def list(args: ServiceControlCliArguments, filterHostPort: Boolean): Unit = { - withZkClient(args.conf) { zkClient => + withDiscoveryClient(args.conf) { discoveryClient => val znodeRoot = getZkNamespace(args) val hostPortOpt = if (filterHostPort) { Some((args.cliArgs.host, args.cliArgs.port.toInt)) } else None - val nodes = getServiceNodes(zkClient, znodeRoot, hostPortOpt) + val nodes = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt) val title = "Zookeeper service nodes" info(renderServiceNodesInfo(title, nodes, verbose)) @@ -140,10 +136,10 @@ private[kyuubi] class ServiceControlCli extends Logging { } private def getServiceNodes( - zkClient: CuratorFramework, + discoveryClient: DiscoveryClient, znodeRoot: String, hostPortOpt: Option[(String, Int)]): Seq[ServiceNodeInfo] = { - val serviceNodes = getServiceNodesInfo(zkClient, znodeRoot) + val serviceNodes = discoveryClient.getServiceNodesInfo(znodeRoot) hostPortOpt match { case Some((host, port)) => serviceNodes.filter { sn => sn.host == host && sn.port == port @@ -156,17 +152,17 @@ private[kyuubi] class ServiceControlCli extends Logging { * Delete zookeeper service node with specified host port. */ private def delete(args: ServiceControlCliArguments): Unit = { - withZkClient(args.conf) { zkClient => + withDiscoveryClient(args.conf) { discoveryClient => val znodeRoot = getZkNamespace(args) val hostPortOpt = Some((args.cliArgs.host, args.cliArgs.port.toInt)) - val nodesToDelete = getServiceNodes(zkClient, znodeRoot, hostPortOpt) + val nodesToDelete = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt) val deletedNodes = ListBuffer[ServiceNodeInfo]() nodesToDelete.foreach { node => val nodePath = s"$znodeRoot/${node.nodeName}" info(s"Deleting zookeeper service node:$nodePath") try { - zkClient.delete().forPath(nodePath) + discoveryClient.delete(nodePath) deletedNodes += node } catch { case e: Exception => @@ -227,7 +223,7 @@ object ServiceControlCli extends CommandLineUtils with Logging { private[ctl] def getZkNamespace(args: ServiceControlCliArguments): String = { args.cliArgs.service match { case ServiceControlObject.SERVER => - ZKPaths.makePath(null, args.cliArgs.namespace) + DiscoveryClient.makePath(null, args.cliArgs.namespace) case ServiceControlObject.ENGINE => val engineType = Some(args.cliArgs.engineType) .filter(_ != null).filter(_.nonEmpty) @@ -235,10 +231,10 @@ object ServiceControlCli extends CommandLineUtils with Logging { val engineSubdomain = Some(args.cliArgs.engineSubdomain) .filter(_ != null).filter(_.nonEmpty) .getOrElse(args.conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).getOrElse("default")) - ZKPaths.makePath( + DiscoveryClient.makePath( s"${args.cliArgs.namespace}_${ShareLevel.USER}_${engineType}", args.cliArgs.user, - engineSubdomain) + Array(engineSubdomain)) } } diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala index 79165baa93f..7474c9f03c8 100644 --- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala +++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala @@ -20,13 +20,12 @@ package org.apache.kyuubi.ctl import java.io.{OutputStream, PrintStream} import java.util.concurrent.atomic.AtomicInteger -import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_NAMESPACE, HA_ZK_QUORUM} -import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider} +import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, ServiceNodeInfo} import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf} trait TestPrematureExit { @@ -86,9 +85,8 @@ trait TestPrematureExit { } class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { + import DiscoveryClientProvider._ import ServiceControlCli._ - import ServiceDiscovery._ - import ZooKeeperClientProvider._ val zkServer = new EmbeddedZookeeper() val conf: KyuubiConf = KyuubiConf() @@ -226,9 +224,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) System.setProperty(HA_ZK_NAMESPACE.key, uniqueNamespace) - withZkClient(conf) { framework => - createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000") - createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001") + withDiscoveryClient(conf) { framework => + framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000") + framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001") val newNamespace = getUniqueNamespace() val args = Array( @@ -245,7 +243,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false)) val znodeRoot = s"/$newNamespace" - val children = framework.getChildren.forPath(znodeRoot).asScala.sorted + val children = framework.getChildren(znodeRoot).sorted assert(children.size == 2) assert(children.head.startsWith( @@ -253,7 +251,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { assert(children.last.startsWith( s"serviceUri=localhost:10001;version=$KYUUBI_VERSION;sequence=")) children.foreach { child => - framework.delete().forPath(s"""$znodeRoot/$child""") + framework.delete(s"""$znodeRoot/$child""") } } } @@ -290,9 +288,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { .set(HA_ZK_NAMESPACE, uniqueNamespace) .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) - withZkClient(conf) { framework => - createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000") - createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001") + withDiscoveryClient(conf) { framework => + framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000") + framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001") val args = Array( "list", @@ -319,9 +317,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { .set(HA_ZK_NAMESPACE, uniqueNamespace) .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) - withZkClient(conf) { framework => - createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000") - createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001") + withDiscoveryClient(conf) { framework => + framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000") + framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001") val args = Array( "get", @@ -351,10 +349,10 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { .set(HA_ZK_NAMESPACE, uniqueNamespace) .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) - withZkClient(conf) { framework => - withZkClient(conf) { zc => - createAndGetServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true) - createAndGetServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true) + withDiscoveryClient(conf) { framework => + withDiscoveryClient(conf) { zc => + framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000", external = true) + framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001", external = true) } val args = Array( @@ -385,9 +383,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit { .set(HA_ZK_NAMESPACE, uniqueNamespace) .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) - withZkClient(conf) { framework => - createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000") - createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001") + withDiscoveryClient(conf) { framework => + framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000") + framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001") val args = Array( "list", diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala index 8ee3cc18036..d283767db34 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala @@ -22,7 +22,8 @@ import java.time.Duration import org.apache.hadoop.security.UserGroupInformation import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf, OptionalConfigEntry} -import org.apache.kyuubi.ha.client.{RetryPolicies, ZooKeeperAuthTypes} +import org.apache.kyuubi.ha.client.AuthTypes +import org.apache.kyuubi.ha.client.RetryPolicies object HighAvailabilityConf { @@ -51,16 +52,16 @@ object HighAvailabilityConf { val HA_ZK_AUTH_TYPE: ConfigEntry[String] = buildConf("ha.zookeeper.auth.type") .doc("The type of zookeeper authentication, all candidates are " + - s"${ZooKeeperAuthTypes.values.mkString("")}") + s"${AuthTypes.values.mkString("")}") .version("1.3.2") .stringConf - .checkValues(ZooKeeperAuthTypes.values.map(_.toString)) - .createWithDefault(ZooKeeperAuthTypes.NONE.toString) + .checkValues(AuthTypes.values.map(_.toString)) + .createWithDefault(AuthTypes.NONE.toString) val HA_ZK_ENGINE_AUTH_TYPE: ConfigEntry[String] = buildConf("ha.zookeeper.engine.auth.type") .doc("The type of zookeeper authentication for engine, all candidates are " + - s"${ZooKeeperAuthTypes.values.mkString("")}") + s"${AuthTypes.values.mkString("")}") .version("1.3.2") .fallbackConf(HA_ZK_AUTH_TYPE) diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/AuthTypes.scala similarity index 91% rename from kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala rename to kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/AuthTypes.scala index da28db730b0..0c02764fb9c 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/AuthTypes.scala @@ -17,9 +17,9 @@ package org.apache.kyuubi.ha.client -object ZooKeeperAuthTypes extends Enumeration { +object AuthTypes extends Enumeration { - type ZooKeeperAuthType = Value + type AuthTypes = Value val NONE, KERBEROS, DIGEST = Value diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClientProvider.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClientProvider.scala new file mode 100644 index 00000000000..8650e135dc6 --- /dev/null +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClientProvider.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.ha.client + +import java.io.IOException + +import org.apache.kyuubi.Logging +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.ha.client.zookeeper.DiscoveryClient + +object DiscoveryClientProvider extends Logging { + + /** + * Creates a zookeeper client before calling `f` and close it after calling `f`. + */ + def withDiscoveryClient[T](conf: KyuubiConf)(f: DiscoveryClient => T): T = { + val discoveryClient = new DiscoveryClient(conf) + try { + discoveryClient.createClient() + f(discoveryClient) + } finally { + try { + discoveryClient.closeClient() + } catch { + case e: IOException => error("Failed to release the zkClient", e) + } + } + } + +} diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala index 1723561a461..c6868555dd4 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala @@ -17,20 +17,12 @@ package org.apache.kyuubi.ha.client -import java.nio.charset.StandardCharsets import java.util.concurrent.atomic.AtomicBoolean -import scala.collection.JavaConverters._ - -import com.google.common.annotations.VisibleForTesting -import org.apache.curator.framework.CuratorFramework -import org.apache.curator.utils.ZKPaths - import org.apache.kyuubi.Logging import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient -import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.createServiceNode import org.apache.kyuubi.service.{AbstractService, FrontendService} /** @@ -80,74 +72,4 @@ object ServiceDiscovery extends Logging { val zkEnsemble = conf.get(HA_ZK_QUORUM) zkEnsemble != null && zkEnsemble.nonEmpty } - - def getServerHost(zkClient: CuratorFramework, namespace: String): Option[(String, Int)] = { - // TODO: use last one because to avoid touching some maybe-crashed engines - // We need a big improvement here. - getServiceNodesInfo(zkClient, namespace, Some(1), silent = true) match { - case Seq(sn) => Some((sn.host, sn.port)) - case _ => None - } - } - - def getEngineByRefId( - zkClient: CuratorFramework, - namespace: String, - engineRefId: String): Option[(String, Int)] = { - getServiceNodesInfo(zkClient, namespace, silent = true) - .find(_.engineRefId.exists(_.equals(engineRefId))) - .map(data => (data.host, data.port)) - } - - def getServiceNodesInfo( - zkClient: CuratorFramework, - namespace: String, - sizeOpt: Option[Int] = None, - silent: Boolean = false): Seq[ServiceNodeInfo] = { - try { - val hosts = zkClient.getChildren.forPath(namespace) - val size = sizeOpt.getOrElse(hosts.size()) - hosts.asScala.takeRight(size).map { p => - val path = ZKPaths.makePath(namespace, p) - val instance = new String(zkClient.getData.forPath(path), StandardCharsets.UTF_8) - val (host, port) = parseInstanceHostPort(instance) - val version = p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version=")) - val engineRefId = p.split(";").find(_.startsWith("refId=")).map(_.stripPrefix("refId=")) - info(s"Get service instance:$instance and version:$version under $namespace") - ServiceNodeInfo(namespace, p, host, port, version, engineRefId) - } - } catch { - case _: Exception if silent => Nil - case e: Exception => - error(s"Failed to get service node info", e) - Nil - } - } - - @VisibleForTesting - private[client] def parseInstanceHostPort(instance: String): (String, Int) = { - val maybeInfos = instance.split(";") - .map(_.split("=", 2)) - .filter(_.size == 2) - .map(i => (i(0), i(1))) - .toMap - if (maybeInfos.size > 0) { - ( - maybeInfos.get("hive.server2.thrift.bind.host").get, - maybeInfos.get("hive.server2.thrift.port").get.toInt) - } else { - val strings = instance.split(":") - (strings(0), strings(1).toInt) - } - } - - def createAndGetServiceNode( - conf: KyuubiConf, - zkClient: CuratorFramework, - namespace: String, - instance: String, - version: Option[String] = None, - external: Boolean = false): String = { - createServiceNode(conf, zkClient, namespace, instance, version, external).getActualPath - } } diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperACLProvider.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ACLProviderWrapper.scala similarity index 86% rename from kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperACLProvider.scala rename to kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ACLProviderWrapper.scala index 654cb8f8da5..1cd94f2e98b 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperACLProvider.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ACLProviderWrapper.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kyuubi.ha.client +package org.apache.kyuubi.ha.client.zookeeper import org.apache.curator.framework.api.ACLProvider import org.apache.zookeeper.ZooDefs @@ -23,8 +23,9 @@ import org.apache.zookeeper.data.ACL import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf +import org.apache.kyuubi.ha.client.AuthTypes -class ZooKeeperACLProvider(conf: KyuubiConf) extends ACLProvider { +class ACLProviderWrapper(conf: KyuubiConf) extends ACLProvider { /** * Return the ACL list to use by default. @@ -53,15 +54,15 @@ class ZooKeeperACLProvider(conf: KyuubiConf) extends ACLProvider { nodeAcls } - private def enabledServerAcls(): Boolean = ZooKeeperAuthTypes + private def enabledServerAcls(): Boolean = AuthTypes .withName(conf.get(HighAvailabilityConf.HA_ZK_AUTH_TYPE)) match { - case ZooKeeperAuthTypes.NONE => false + case AuthTypes.NONE => false case _ => true } - private def enabledEngineAcls(): Boolean = ZooKeeperAuthTypes + private def enabledEngineAcls(): Boolean = AuthTypes .withName(conf.get(HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE)) match { - case ZooKeeperAuthTypes.NONE => false + case AuthTypes.NONE => false case _ => true } diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ClientProvider.scala similarity index 85% rename from kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala rename to kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ClientProvider.scala index fa7d366edb1..f41e5c071c6 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ClientProvider.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kyuubi.ha.client +package org.apache.kyuubi.ha.client.zookeeper import java.io.{File, IOException} import javax.security.auth.login.Configuration @@ -30,15 +30,16 @@ import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManage import org.apache.kyuubi.Logging import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf._ +import org.apache.kyuubi.ha.client.AuthTypes +import org.apache.kyuubi.ha.client.RetryPolicies +import org.apache.kyuubi.ha.client.RetryPolicies._ import org.apache.kyuubi.util.KyuubiHadoopUtils -object ZooKeeperClientProvider extends Logging { - - import RetryPolicies._ +object ClientProvider extends Logging { /** * Create a [[CuratorFramework]] instance to be used as the ZooKeeper client - * Use the [[ZooKeeperACLProvider]] to create appropriate ACLs + * Use the [[ACLProviderWrapper]] to create appropriate ACLs */ def buildZookeeperClient(conf: KyuubiConf): CuratorFramework = { setUpZooKeeperAuth(conf) @@ -61,7 +62,7 @@ object ZooKeeperClientProvider extends Logging { .connectString(connectionStr) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectionTimeout) - .aclProvider(new ZooKeeperACLProvider(conf)) + .aclProvider(new ACLProviderWrapper(conf)) .retryPolicy(retryPolicy) conf.get(HA_ZK_AUTH_DIGEST) match { @@ -94,23 +95,6 @@ object ZooKeeperClientProvider extends Logging { } } - /** - * Creates a zookeeper client before calling `f` and close it after calling `f`. - */ - def withZkClient[T](conf: KyuubiConf)(f: CuratorFramework => T): T = { - val zkClient = buildZookeeperClient(conf) - try { - zkClient.start() - f(zkClient) - } finally { - try { - zkClient.close() - } catch { - case e: IOException => error("Failed to release the zkClient", e) - } - } - } - /** * For a kerberized cluster, we dynamically set up the client's JAAS conf. * @@ -136,10 +120,10 @@ object ZooKeeperClientProvider extends Logging { } if (conf.get(HA_ZK_ENGINE_REF_ID).isEmpty - && ZooKeeperAuthTypes.withName(conf.get(HA_ZK_AUTH_TYPE)) == ZooKeeperAuthTypes.KERBEROS) { + && AuthTypes.withName(conf.get(HA_ZK_AUTH_TYPE)) == AuthTypes.KERBEROS) { setupZkAuth() - } else if (conf.get(HA_ZK_ENGINE_REF_ID).nonEmpty && ZooKeeperAuthTypes - .withName(conf.get(HA_ZK_ENGINE_AUTH_TYPE)) == ZooKeeperAuthTypes.KERBEROS) { + } else if (conf.get(HA_ZK_ENGINE_REF_ID).nonEmpty && AuthTypes + .withName(conf.get(HA_ZK_ENGINE_AUTH_TYPE)) == AuthTypes.KERBEROS) { setupZkAuth() } diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/DiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/DiscoveryClient.scala new file mode 100644 index 00000000000..8b31297c3ba --- /dev/null +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/DiscoveryClient.scala @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.ha.client.zookeeper + +import java.nio.charset.StandardCharsets +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ + +import com.google.common.annotations.VisibleForTesting +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex +import org.apache.curator.framework.recipes.nodes.PersistentNode +import org.apache.curator.utils.ZKPaths +import org.apache.zookeeper.CreateMode + +import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.Logging +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.ha.client.ServiceNodeInfo +import org.apache.kyuubi.ha.client.zookeeper.ClientProvider.buildZookeeperClient +import org.apache.kyuubi.ha.client.zookeeper.DiscoveryClient.parseInstanceHostPort +import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.createServiceNode + +class DiscoveryClient(conf: KyuubiConf) extends Logging { + + private val zkClient: CuratorFramework = buildZookeeperClient(conf) + + def createClient(): Unit = { + zkClient.start() + } + + def closeClient(): Unit = { + zkClient.close() + } + + def create(path: String, mode: String, createParent: Boolean = true): String = { + val builder = + if (createParent) zkClient.create().creatingParentsIfNeeded() else zkClient.create() + builder + .withMode(CreateMode.valueOf(mode)) + .forPath(path) + } + + def getData(path: String): Array[Byte] = { + zkClient.getData.forPath(path) + } + + def getChildren(path: String): List[String] = { + zkClient.getChildren.forPath(path).asScala.toList + } + + def pathExists(path: String): Boolean = { + zkClient.checkExists().forPath(path) != null + } + + def pathNonExists(path: String): Boolean = { + zkClient.checkExists().forPath(path) == null + } + + def delete(path: String): Unit = { + zkClient.delete().forPath(path) + } + + def tryWithLock[T]( + lockPath: String, + timeout: Long, + unit: TimeUnit = TimeUnit.MILLISECONDS)(f: => T): T = { + var lock: InterProcessSemaphoreMutex = null + try { + try { + lock = new InterProcessSemaphoreMutex(zkClient, lockPath) + // Acquire a lease. If no leases are available, this method blocks until either the + // maximum number of leases is increased or another client/process closes a lease + lock.acquire(timeout, unit) + } catch { + case e: Exception => throw KyuubiSQLException(s"Lock failed on path [$lockPath]", e) + } + f + } finally { + try { + if (lock != null) { + lock.release() + } + } catch { + case _: Exception => + } + } + } + + def getServerHost(namespace: String): Option[(String, Int)] = { + // TODO: use last one because to avoid touching some maybe-crashed engines + // We need a big improvement here. + getServiceNodesInfo(namespace, Some(1), silent = true) match { + case Seq(sn) => Some((sn.host, sn.port)) + case _ => None + } + } + + def getEngineByRefId( + namespace: String, + engineRefId: String): Option[(String, Int)] = { + getServiceNodesInfo(namespace, silent = true) + .find(_.engineRefId.exists(_.equals(engineRefId))) + .map(data => (data.host, data.port)) + } + + def getServiceNodesInfo( + namespace: String, + sizeOpt: Option[Int] = None, + silent: Boolean = false): Seq[ServiceNodeInfo] = { + try { + val hosts = zkClient.getChildren.forPath(namespace) + val size = sizeOpt.getOrElse(hosts.size()) + hosts.asScala.takeRight(size).map { p => + val path = ZKPaths.makePath(namespace, p) + val instance = new String(zkClient.getData.forPath(path), StandardCharsets.UTF_8) + val (host, port) = parseInstanceHostPort(instance) + val version = p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version=")) + val engineRefId = p.split(";").find(_.startsWith("refId=")).map(_.stripPrefix("refId=")) + info(s"Get service instance:$instance and version:$version under $namespace") + ServiceNodeInfo(namespace, p, host, port, version, engineRefId) + } + } catch { + case _: Exception if silent => Nil + case e: Exception => + error(s"Failed to get service node info", e) + Nil + } + } + + def createAndGetServiceNode( + conf: KyuubiConf, + namespace: String, + instance: String, + version: Option[String] = None, + external: Boolean = false): String = { + createServiceNode(conf, zkClient, namespace, instance, version, external).getActualPath + } + + @VisibleForTesting + def startSecretNode( + createMode: String, + basePath: String, + initData: String, + useProtection: Boolean = false): Unit = { + val secretNode = new PersistentNode( + zkClient, + CreateMode.valueOf(createMode), + useProtection, + basePath, + initData.getBytes(StandardCharsets.UTF_8)) + secretNode.start() + } +} + +object DiscoveryClient { + + def makePath(parent: String, firstChild: String): String = { + ZKPaths.makePath(parent, firstChild) + } + + def makePath(parent: String, firstChild: String, restChildren: Array[String]): String = { + ZKPaths.makePath(parent, firstChild, restChildren: _*) + } + + @VisibleForTesting + private[client] def parseInstanceHostPort(instance: String): (String, Int) = { + val maybeInfos = instance.split(";") + .map(_.split("=", 2)) + .filter(_.size == 2) + .map(i => (i(0), i(1))) + .toMap + if (maybeInfos.size > 0) { + ( + maybeInfos.get("hive.server2.thrift.bind.host").get, + maybeInfos.get("hive.server2.thrift.port").get.toInt) + } else { + val strings = instance.split(":") + (strings(0), strings(1).toInt) + } + } +} diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala index 98efe239974..26cc4f13a67 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala @@ -46,7 +46,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NODE_TIMEOUT import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_PUBLISH_CONFIGS import org.apache.kyuubi.ha.client.ServiceDiscovery -import org.apache.kyuubi.ha.client.ZooKeeperClientProvider.{buildZookeeperClient, getGracefulStopThreadDelay} +import org.apache.kyuubi.ha.client.zookeeper.ClientProvider.{buildZookeeperClient, getGracefulStopThreadDelay} import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.connectionChecker import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.createServiceNode import org.apache.kyuubi.util.KyuubiHadoopUtils diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala index 03730795160..75af21e3aed 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala @@ -21,10 +21,10 @@ import java.nio.charset.StandardCharsets import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_SECURE_SECRET_NODE -import org.apache.kyuubi.ha.client.ZooKeeperClientProvider +import org.apache.kyuubi.ha.client.DiscoveryClientProvider class ZooKeeperEngineSecuritySecretProviderImpl extends EngineSecuritySecretProvider { - import ZooKeeperClientProvider._ + import DiscoveryClientProvider._ private var conf: KyuubiConf = _ @@ -34,8 +34,8 @@ class ZooKeeperEngineSecuritySecretProviderImpl extends EngineSecuritySecretProv override def getSecret(): String = { conf.get(HA_ZK_ENGINE_SECURE_SECRET_NODE).map { zkNode => - withZkClient[String](conf) { zkClient => - new String(zkClient.getData.forPath(zkNode), StandardCharsets.UTF_8) + withDiscoveryClient[String](conf) { discoveryClient => + new String(discoveryClient.getData(zkNode), StandardCharsets.UTF_8) } }.getOrElse( throw new IllegalArgumentException(s"${HA_ZK_ENGINE_SECURE_SECRET_NODE.key} is not defined")) diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala new file mode 100644 index 00000000000..ca2a4ba8839 --- /dev/null +++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.ha.client + +import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.config.KyuubiConf + +class DiscoveryClientProviderSuite extends KyuubiFunSuite { + test("discovery") { + val conf = KyuubiConf() + DiscoveryClientProvider.withDiscoveryClient(conf) { discoveryClient => + discoveryClient.getServerHost("/kyuubi") + } + DiscoveryClientProvider.withDiscoveryClient(conf) { discoveryClient => + discoveryClient.getServerHost("/kyuubi") + } + } +} diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ClientProviderSuite.scala similarity index 68% rename from kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala rename to kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ClientProviderSuite.scala index 97cbdf8a5b3..0305d33691d 100644 --- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala +++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ClientProviderSuite.scala @@ -15,13 +15,16 @@ * limitations under the License. */ -package org.apache.kyuubi.ha.client +package org.apache.kyuubi.ha.client.zookeeper import org.apache.kyuubi.KyuubiFunSuite import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_CONN_BASE_RETRY_WAIT, HA_ZK_CONN_MAX_RETRIES, HA_ZK_CONN_MAX_RETRY_WAIT, HA_ZK_CONN_RETRY_POLICY} +import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_BASE_RETRY_WAIT +import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRIES +import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRY_WAIT +import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY -class ZooKeeperClientProviderSuite extends KyuubiFunSuite { +class ClientProviderSuite extends KyuubiFunSuite { test("get graceful stop thread start delay") { val conf = KyuubiConf() @@ -29,23 +32,23 @@ class ZooKeeperClientProviderSuite extends KyuubiFunSuite { val baseSleepTime = conf.get(HA_ZK_CONN_BASE_RETRY_WAIT) val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT) val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES) - val delay1 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf) + val delay1 = ClientProvider.getGracefulStopThreadDelay(conf) assert(delay1 >= baseSleepTime * maxRetries) conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME") - val delay2 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf) + val delay2 = ClientProvider.getGracefulStopThreadDelay(conf) assert(delay2 === baseSleepTime) conf.set(HA_ZK_CONN_RETRY_POLICY, "N_TIME") - val delay3 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf) + val delay3 = ClientProvider.getGracefulStopThreadDelay(conf) assert(delay3 === baseSleepTime * maxRetries) conf.set(HA_ZK_CONN_RETRY_POLICY, "UNTIL_ELAPSED") - val delay4 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf) + val delay4 = ClientProvider.getGracefulStopThreadDelay(conf) assert(delay4 === maxSleepTime) conf.set(HA_ZK_CONN_RETRY_POLICY, "BOUNDED_EXPONENTIAL_BACKOFF") - val delay5 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf) + val delay5 = ClientProvider.getGracefulStopThreadDelay(conf) assert(delay5 >= baseSleepTime * maxRetries) } } diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/DiscoveryClientSuite.scala similarity index 83% rename from kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala rename to kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/DiscoveryClientSuite.scala index 182bf56e413..007531563b9 100644 --- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala +++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/DiscoveryClientSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kyuubi.ha.client +package org.apache.kyuubi.ha.client.zookeeper import java.io.{File, IOException} import java.net.InetAddress @@ -33,11 +33,15 @@ import org.scalatest.time.SpanSugar._ import org.apache.kyuubi.{KerberizedTestHelper, KYUUBI_VERSION} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf._ +import org.apache.kyuubi.ha.client.AuthTypes +import org.apache.kyuubi.ha.client.DiscoveryClientProvider +import org.apache.kyuubi.ha.client.EngineServiceDiscovery +import org.apache.kyuubi.ha.client.KyuubiServiceDiscovery import org.apache.kyuubi.service._ import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf} -class ServiceDiscoverySuite extends KerberizedTestHelper { - import ZooKeeperClientProvider._ +class DiscoveryClientSuite extends KerberizedTestHelper { + import DiscoveryClientProvider._ val zkServer = new EmbeddedZookeeper() val conf: KyuubiConf = KyuubiConf() @@ -80,17 +84,17 @@ class ServiceDiscoverySuite extends KerberizedTestHelper { server.initialize(conf) server.start() val znodeRoot = s"/$namespace" - withZkClient(conf) { framework => + withDiscoveryClient(conf) { framework => try { - assert(framework.checkExists().forPath("/abc") === null) - assert(framework.checkExists().forPath(znodeRoot) !== null) - val children = framework.getChildren.forPath(znodeRoot).asScala + assert(framework.pathNonExists("/abc")) + assert(framework.pathExists(znodeRoot)) + val children = framework.getChildren(znodeRoot) assert(children.head === s"serviceUri=${server.frontendServices.head.connectionUrl};" + s"version=$KYUUBI_VERSION;sequence=0000000000") children.foreach { child => - framework.delete().forPath(s"""$znodeRoot/$child""") + framework.delete(s"""$znodeRoot/$child""") } eventually(timeout(5.seconds), interval(100.millis)) { assert(serviceDiscovery.getServiceState === ServiceState.STOPPED) @@ -112,21 +116,21 @@ class ServiceDiscoverySuite extends KerberizedTestHelper { assert(actual === expected) } - val acl = new ZooKeeperACLProvider(conf).getDefaultAcl + val acl = new ACLProviderWrapper(conf).getDefaultAcl assertACL(expectedNoACL, acl) - val serverConf = conf.clone.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.KERBEROS.toString) - val serverACL = new ZooKeeperACLProvider(serverConf).getDefaultAcl + val serverConf = conf.clone.set(HA_ZK_AUTH_TYPE, AuthTypes.KERBEROS.toString) + val serverACL = new ACLProviderWrapper(serverConf).getDefaultAcl assertACL(expectedEnableACL, serverACL) val engineConf = serverConf.clone.set(HA_ZK_ENGINE_REF_ID, "ref") - engineConf.set(HA_ZK_ENGINE_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString) - val engineACL = new ZooKeeperACLProvider(engineConf).getDefaultAcl + engineConf.set(HA_ZK_ENGINE_AUTH_TYPE, AuthTypes.NONE.toString) + val engineACL = new ACLProviderWrapper(engineConf).getDefaultAcl assertACL(expectedNoACL, engineACL) val enableEngineACLConf = serverConf.clone.set(HA_ZK_ENGINE_REF_ID, "ref") - enableEngineACLConf.set(HA_ZK_ENGINE_AUTH_TYPE, ZooKeeperAuthTypes.KERBEROS.toString) - val enableEngineACL = new ZooKeeperACLProvider(enableEngineACLConf).getDefaultAcl + enableEngineACLConf.set(HA_ZK_ENGINE_AUTH_TYPE, AuthTypes.KERBEROS.toString) + val enableEngineACL = new ACLProviderWrapper(enableEngineACLConf).getDefaultAcl assertACL(expectedEnableACL, enableEngineACL) } @@ -137,9 +141,9 @@ class ServiceDiscoverySuite extends KerberizedTestHelper { conf.set(HA_ZK_AUTH_KEYTAB.key, keytab.getCanonicalPath) conf.set(HA_ZK_AUTH_PRINCIPAL.key, principal) - conf.set(HA_ZK_AUTH_TYPE.key, ZooKeeperAuthTypes.KERBEROS.toString) + conf.set(HA_ZK_AUTH_TYPE.key, AuthTypes.KERBEROS.toString) - ZooKeeperClientProvider.setUpZooKeeperAuth(conf) + ClientProvider.setUpZooKeeperAuth(conf) val configuration = Configuration.getConfiguration val entries = configuration.getAppConfigurationEntry("KyuubiZooKeeperClient") @@ -151,7 +155,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper { assert(options("useKeyTab").toString.toBoolean) conf.set(HA_ZK_AUTH_KEYTAB.key, s"${keytab.getName}") - val e = intercept[IOException](ZooKeeperClientProvider.setUpZooKeeperAuth(conf)) + val e = intercept[IOException](ClientProvider.setUpZooKeeperAuth(conf)) assert(e.getMessage === s"${HA_ZK_AUTH_KEYTAB.key} does not exists") } } @@ -167,7 +171,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper { .set(HA_ZK_QUORUM, zkServer.getConnectString) .set(HA_ZK_NAMESPACE, namespace) .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) - .set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString) + .set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString) var serviceDiscovery: KyuubiServiceDiscovery = null val server: Serverable = new NoopTBinaryFrontendServer() { @@ -183,18 +187,18 @@ class ServiceDiscoverySuite extends KerberizedTestHelper { server.start() val znodeRoot = s"/$namespace" - withZkClient(conf) { framework => + withDiscoveryClient(conf) { framework => try { - assert(framework.checkExists().forPath("/abc") === null) - assert(framework.checkExists().forPath(znodeRoot) !== null) - val children = framework.getChildren.forPath(znodeRoot).asScala + assert(framework.pathNonExists("/abc")) + assert(framework.pathExists(znodeRoot)) + val children = framework.getChildren(znodeRoot) assert(children.head === s"serviceUri=${server.frontendServices.head.connectionUrl};" + s"version=$KYUUBI_VERSION;sequence=0000000000") children.foreach { child => - framework.delete().forPath(s"""$znodeRoot/$child""") + framework.delete(s"""$znodeRoot/$child""") } eventually(timeout(5.seconds), interval(100.millis)) { assert(serviceDiscovery.getServiceState === ServiceState.STOPPED) @@ -216,7 +220,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper { val host = "127.0.0.1" val port = 10009 val instance1 = s"$host:$port" - val (host1, port1) = ServiceDiscovery.parseInstanceHostPort(instance1) + val (host1, port1) = DiscoveryClient.parseInstanceHostPort(instance1) assert(host === host1) assert(port === port1) @@ -224,7 +228,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper { s"hive.server2.transport.mode=binary;hive.server2.authentication=KERBEROS;" + s"hive.server2.thrift.port=$port;" + s"hive.server2.authentication.kerberos.principal=test/_HOST@apache.org" - val (host2, port2) = ServiceDiscovery.parseInstanceHostPort(instance2) + val (host2, port2) = DiscoveryClient.parseInstanceHostPort(instance2) assert(host === host2) assert(port === port2) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index 21a8a89d242..3c0fde0b7a3 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -24,9 +24,6 @@ import scala.util.Random import com.codahale.metrics.MetricRegistry import com.google.common.annotations.VisibleForTesting -import org.apache.curator.framework.CuratorFramework -import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex -import org.apache.curator.utils.ZKPaths import org.apache.hadoop.security.UserGroupInformation import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, Utils} @@ -40,8 +37,7 @@ import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.engine.trino.TrinoProcessBuilder import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE -import org.apache.kyuubi.ha.client.ServiceDiscovery.getEngineByRefId -import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost +import org.apache.kyuubi.ha.client.zookeeper.DiscoveryClient import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL} import org.apache.kyuubi.metrics.MetricsSystem import org.apache.kyuubi.operation.log.OperationLog @@ -134,8 +130,8 @@ private[kyuubi] class EngineRef( private[kyuubi] lazy val engineSpace: String = { val commonParent = s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType" shareLevel match { - case CONNECTION => ZKPaths.makePath(commonParent, appUser, engineRefId) - case _ => ZKPaths.makePath(commonParent, appUser, subdomain) + case CONNECTION => DiscoveryClient.makePath(commonParent, appUser, Array(engineRefId)) + case _ => DiscoveryClient.makePath(commonParent, appUser, Array(subdomain)) } } @@ -143,38 +139,19 @@ private[kyuubi] class EngineRef( * The distributed lock path used to ensure only once engine being created for non-CONNECTION * share level. */ - private def tryWithLock[T](zkClient: CuratorFramework)(f: => T): T = shareLevel match { + private def tryWithLock[T](discoveryClient: DiscoveryClient)(f: => T): T = shareLevel match { case CONNECTION => f case _ => val lockPath = - ZKPaths.makePath(s"${serverSpace}_$shareLevel", "lock", appUser, subdomain) - var lock: InterProcessSemaphoreMutex = null - try { - try { - lock = new InterProcessSemaphoreMutex(zkClient, lockPath) - // Acquire a lease. If no leases are available, this method blocks until either the - // maximum number of leases is increased or another client/process closes a lease - lock.acquire(timeout, TimeUnit.MILLISECONDS) - } catch { - case e: Exception => throw KyuubiSQLException(s"Lock failed on path [$lockPath]", e) - } - f - } finally { - try { - if (lock != null) { - lock.release() - } - } catch { - case _: Exception => - } - } + DiscoveryClient.makePath(s"${serverSpace}_$shareLevel", "lock", Array(appUser, subdomain)) + discoveryClient.tryWithLock(lockPath, timeout, TimeUnit.MILLISECONDS)(f) } private def create( - zkClient: CuratorFramework, - extraEngineLog: Option[OperationLog]): (String, Int) = tryWithLock(zkClient) { + discoveryClient: DiscoveryClient, + extraEngineLog: Option[OperationLog]): (String, Int) = tryWithLock(discoveryClient) { // Get the engine address ahead if another process has succeeded - var engineRef = getServerHost(zkClient, engineSpace) + var engineRef = discoveryClient.getServerHost(engineSpace) if (engineRef.nonEmpty) return engineRef.get conf.set(HA_ZK_NAMESPACE, engineSpace) @@ -227,7 +204,7 @@ private[kyuubi] class EngineRef( s"Timeout($timeout ms) to launched $engineType engine with $builder. $killMessage", builder.getError) } - engineRef = getEngineByRefId(zkClient, engineSpace, engineRefId) + engineRef = discoveryClient.getEngineByRefId(engineSpace, engineRefId) } engineRef.get } finally { @@ -240,15 +217,15 @@ private[kyuubi] class EngineRef( /** * Get the engine ref from engine space first or create a new one * - * @param zkClient the zookeeper client to get or create engine instance + * @param discoveryClient the zookeeper client to get or create engine instance * @param extraEngineLog the launch engine operation log, used to inject engine log into it */ def getOrCreate( - zkClient: CuratorFramework, + discoveryClient: DiscoveryClient, extraEngineLog: Option[OperationLog] = None): (String, Int) = { - getServerHost(zkClient, engineSpace) + discoveryClient.getServerHost(engineSpace) .getOrElse { - create(zkClient, extraEngineLog) + create(discoveryClient, extraEngineLog) } } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index ab6b8f5c325..09923e1e64b 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -33,7 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_MAIN_RESOURCE, FRONTEND_THRIFT_BINARY_BIND_HOST} import org.apache.kyuubi.engine.ProcBuilder import org.apache.kyuubi.ha.HighAvailabilityConf -import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes +import org.apache.kyuubi.ha.client.AuthTypes import org.apache.kyuubi.operation.log.OperationLog class SparkProcessBuilder( @@ -110,8 +110,8 @@ class SparkProcessBuilder( var allConf = conf.getAll // if enable sasl kerberos authentication for zookeeper, need to upload the server ketab file - if (ZooKeeperAuthTypes.withName(conf.get(HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE)) - == ZooKeeperAuthTypes.KERBEROS) { + if (AuthTypes.withName(conf.get(HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE)) + == AuthTypes.KERBEROS) { allConf = allConf ++ zkAuthKeytabFileConf(allConf) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index 3f6e34eb1d1..92cad7d87b2 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -21,9 +21,7 @@ import java.util import scala.util.Properties -import org.apache.curator.utils.ZKPaths import org.apache.hadoop.security.UserGroupInformation -import org.apache.zookeeper.CreateMode.PERSISTENT import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.NodeExistsException @@ -33,8 +31,10 @@ import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS, FrontendProtocol import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._ import org.apache.kyuubi.events.{EventLogging, KyuubiServerInfoEvent} import org.apache.kyuubi.ha.HighAvailabilityConf._ -import org.apache.kyuubi.ha.client.{ServiceDiscovery, ZooKeeperAuthTypes} -import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._ +import org.apache.kyuubi.ha.client.AuthTypes +import org.apache.kyuubi.ha.client.DiscoveryClientProvider._ +import org.apache.kyuubi.ha.client.ServiceDiscovery +import org.apache.kyuubi.ha.client.zookeeper.DiscoveryClient import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem} import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState} import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister} @@ -49,7 +49,7 @@ object KyuubiServer extends Logging { zkServer.initialize(conf) zkServer.start() conf.set(HA_ZK_QUORUM, zkServer.getConnectString) - conf.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString) + conf.set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString) } else { // create chroot path if necessary val connectionStr = conf.get(HA_ZK_QUORUM) @@ -69,15 +69,11 @@ object KyuubiServer extends Logging { chrootOption.foreach { chroot => val zkConnectionForChrootCreation = connectionStr.substring(0, chrootIndex) val overrideQuorumConf = conf.clone.set(HA_ZK_QUORUM, zkConnectionForChrootCreation) - withZkClient(overrideQuorumConf) { zkClient => - if (zkClient.checkExists().forPath(chroot) == null) { - val chrootPath = ZKPaths.makePath(null, chroot) + withDiscoveryClient(overrideQuorumConf) { discoveryClient => + if (discoveryClient.pathNonExists(chroot)) { + val chrootPath = DiscoveryClient.makePath(null, chroot) try { - zkClient - .create() - .creatingParentsIfNeeded() - .withMode(PERSISTENT) - .forPath(chrootPath) + discoveryClient.create(chrootPath, "PERSISTENT") } catch { case _: NodeExistsException => // do nothing case e: KeeperException => diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 8d7b0ab6dca..296874d3b6e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -28,7 +28,7 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.engine.EngineRef import org.apache.kyuubi.events.{EventLogging, KyuubiEvent, KyuubiSessionEvent} -import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._ +import org.apache.kyuubi.ha.client.DiscoveryClientProvider._ import org.apache.kyuubi.metrics.MetricsConstants._ import org.apache.kyuubi.metrics.MetricsSystem import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationState} @@ -94,8 +94,8 @@ class KyuubiSessionImpl( } private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit = { - withZkClient(sessionConf) { zkClient => - val (host, port) = engine.getOrCreate(zkClient, extraEngineLog) + withDiscoveryClient(sessionConf) { discoveryClient => + val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog) val passwd = if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) { EngineSecurityAccessor.get().issueToken() diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala index 38171cefabd..075c79faa66 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala @@ -21,7 +21,7 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_AUTH_TYPE, HA_ZK_QUORUM} -import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes +import org.apache.kyuubi.ha.client.AuthTypes import org.apache.kyuubi.server.KyuubiServer import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf} @@ -48,7 +48,7 @@ trait WithKyuubiServer extends KyuubiFunSuite { zkServer.initialize(conf) zkServer.start() conf.set(HA_ZK_QUORUM, zkServer.getConnectString) - conf.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString) + conf.set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString) conf.set("spark.ui.enabled", "false") conf.setIfMissing("spark.sql.catalogImplementation", "in-memory") diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala index 861dd0f02aa..1065b22c2fb 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala @@ -19,7 +19,6 @@ package org.apache.kyuubi.engine import java.util.UUID -import org.apache.curator.utils.ZKPaths import org.apache.hadoop.security.UserGroupInformation import org.scalatest.time.SpanSugar.convertIntToGrainOfTime @@ -27,7 +26,8 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.ha.HighAvailabilityConf -import org.apache.kyuubi.ha.client.ZooKeeperClientProvider +import org.apache.kyuubi.ha.client.DiscoveryClientProvider +import org.apache.kyuubi.ha.client.zookeeper.DiscoveryClient import org.apache.kyuubi.util.NamedThreadFactory import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf} @@ -67,7 +67,10 @@ class EngineRefSuite extends KyuubiFunSuite { domain.foreach(conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, _)) val engine = new EngineRef(conf, user, id) assert(engine.engineSpace === - ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${CONNECTION}_${engineType}", user, id)) + DiscoveryClient.makePath( + s"kyuubi_${KYUUBI_VERSION}_${CONNECTION}_${engineType}", + user, + Array(id))) assert(engine.defaultEngineName === s"kyuubi_${CONNECTION}_${engineType}_${user}_$id") } } @@ -78,7 +81,10 @@ class EngineRefSuite extends KyuubiFunSuite { conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString) val appName = new EngineRef(conf, user, id) assert(appName.engineSpace === - ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${USER}_$FLINK_SQL", user, "default")) + DiscoveryClient.makePath( + s"kyuubi_${KYUUBI_VERSION}_${USER}_$FLINK_SQL", + user, + Array("default"))) assert(appName.defaultEngineName === s"kyuubi_${USER}_${FLINK_SQL}_${user}_default_$id") Seq(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN, KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN).foreach { @@ -87,7 +93,10 @@ class EngineRefSuite extends KyuubiFunSuite { conf.set(k.key, "abc") val appName2 = new EngineRef(conf, user, id) assert(appName2.engineSpace === - ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${USER}_${FLINK_SQL}", user, "abc")) + DiscoveryClient.makePath( + s"kyuubi_${KYUUBI_VERSION}_${USER}_${FLINK_SQL}", + user, + Array("abc"))) assert(appName2.defaultEngineName === s"kyuubi_${USER}_${FLINK_SQL}_${user}_abc_$id") } } @@ -99,7 +108,10 @@ class EngineRefSuite extends KyuubiFunSuite { val engineRef = new EngineRef(conf, user, id) val primaryGroupName = UserGroupInformation.createRemoteUser(user).getPrimaryGroupName assert(engineRef.engineSpace === - ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL", primaryGroupName, "default")) + DiscoveryClient.makePath( + s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL", + primaryGroupName, + Array("default"))) assert(engineRef.defaultEngineName === s"kyuubi_GROUP_SPARK_SQL_${primaryGroupName}_default_$id") @@ -109,10 +121,10 @@ class EngineRefSuite extends KyuubiFunSuite { conf.set(k.key, "abc") val engineRef2 = new EngineRef(conf, user, id) assert(engineRef2.engineSpace === - ZKPaths.makePath( + DiscoveryClient.makePath( s"kyuubi_${KYUUBI_VERSION}_${GROUP}_${SPARK_SQL}", primaryGroupName, - "abc")) + Array("abc"))) assert(engineRef2.defaultEngineName === s"kyuubi_${GROUP}_${SPARK_SQL}_${primaryGroupName}_abc_$id") } @@ -122,7 +134,7 @@ class EngineRefSuite extends KyuubiFunSuite { assert(newUGI.getGroupNames.isEmpty) val engineRef3 = new EngineRef(conf, userName, id) assert(engineRef3.engineSpace === - ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL", userName, "abc")) + DiscoveryClient.makePath(s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL", userName, Array("abc"))) assert(engineRef3.defaultEngineName === s"kyuubi_GROUP_SPARK_SQL_${userName}_abc_$id") } @@ -132,13 +144,19 @@ class EngineRefSuite extends KyuubiFunSuite { conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString) val appName = new EngineRef(conf, user, id) assert(appName.engineSpace === - ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}", user, "default")) + DiscoveryClient.makePath( + s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}", + user, + Array("default"))) assert(appName.defaultEngineName === s"kyuubi_${SERVER}_${FLINK_SQL}_${user}_default_$id") conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc") val appName2 = new EngineRef(conf, user, id) assert(appName2.engineSpace === - ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}", user, "abc")) + DiscoveryClient.makePath( + s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}", + user, + Array("abc"))) assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${FLINK_SQL}_${user}_abc_$id") } @@ -200,7 +218,7 @@ class EngineRefSuite extends KyuubiFunSuite { val r1 = new Runnable { override def run(): Unit = { - ZooKeeperClientProvider.withZkClient(conf) { client => + DiscoveryClientProvider.withDiscoveryClient(conf) { client => val hp = engine.getOrCreate(client) port1 = hp._2 } @@ -209,7 +227,7 @@ class EngineRefSuite extends KyuubiFunSuite { val r2 = new Runnable { override def run(): Unit = { - ZooKeeperClientProvider.withZkClient(conf) { client => + DiscoveryClientProvider.withDiscoveryClient(conf) { client => val hp = engine.getOrCreate(client) port2 = hp._2 } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index 79fb21f9621..92b9635a6a5 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -28,7 +28,7 @@ import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_LOG_TIMEOUT, ENGINE_SPARK_MAIN_RESOURCE, FRONTEND_THRIFT_BINARY_BIND_HOST} import org.apache.kyuubi.ha.HighAvailabilityConf -import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes +import org.apache.kyuubi.ha.client.AuthTypes import org.apache.kyuubi.service.ServiceUtils class SparkProcessBuilderSuite extends KerberizedTestHelper { @@ -263,7 +263,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper { test("zookeeper kerberos authentication") { val conf = KyuubiConf() - conf.set(HighAvailabilityConf.HA_ZK_AUTH_TYPE.key, ZooKeeperAuthTypes.KERBEROS.toString) + conf.set(HighAvailabilityConf.HA_ZK_AUTH_TYPE.key, AuthTypes.KERBEROS.toString) conf.set(HighAvailabilityConf.HA_ZK_AUTH_KEYTAB.key, testKeytab) conf.set(HighAvailabilityConf.HA_ZK_AUTH_PRINCIPAL.key, testPrincipal) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala index ae5d611e09a..5face684f63 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala @@ -17,19 +17,14 @@ package org.apache.kyuubi.operation -import java.nio.charset.StandardCharsets - -import org.apache.curator.framework.recipes.nodes.PersistentNode -import org.apache.zookeeper.CreateMode - import org.apache.kyuubi.WithKyuubiServer import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf -import org.apache.kyuubi.ha.client.ZooKeeperClientProvider +import org.apache.kyuubi.ha.client.DiscoveryClientProvider import org.apache.kyuubi.service.authentication.{EngineSecurityAccessor, ZooKeeperEngineSecuritySecretProviderImpl} class KyuubiOperationWithEngineSecurity extends WithKyuubiServer with HiveJDBCTestHelper { - import ZooKeeperClientProvider._ + import DiscoveryClientProvider._ override protected def jdbcUrl: String = getJdbcUrl @@ -46,15 +41,9 @@ class KyuubiOperationWithEngineSecurity extends WithKyuubiServer with HiveJDBCTe override def beforeAll(): Unit = { super.beforeAll() - withZkClient(conf) { zkClient => - zkClient.create().withMode(CreateMode.PERSISTENT).forPath(engineSecretNode) - val secretNode = new PersistentNode( - zkClient, - CreateMode.PERSISTENT, - false, - engineSecretNode, - "_ENGINE_SECRET_".getBytes(StandardCharsets.UTF_8)) - secretNode.start() + withDiscoveryClient(conf) { discoveryClient => + discoveryClient.create(engineSecretNode, "PERSISTENT", false) + discoveryClient.startSecretNode("PERSISTENT", engineSecretNode, "_ENGINE_SECRET_") } conf.set(KyuubiConf.ENGINE_SECURITY_ENABLED, true) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala index 62284fd9d19..fba26709d4c 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala @@ -20,7 +20,7 @@ package org.apache.kyuubi.server import org.apache.kyuubi.{KyuubiFunSuite, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ha.HighAvailabilityConf -import org.apache.kyuubi.ha.client.ZooKeeperClientProvider +import org.apache.kyuubi.ha.client.DiscoveryClientProvider import org.apache.kyuubi.service.ServiceState._ import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf} @@ -116,16 +116,16 @@ class KyuubiServerSuite extends KyuubiFunSuite { val chrootPath = "/lake" conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkConnection) // chroot path does not exist before server start - ZooKeeperClientProvider.withZkClient(conf) { client => - assert(client.checkExists().forPath(chrootPath) == null) + DiscoveryClientProvider.withDiscoveryClient(conf) { client => + assert(client.pathNonExists(chrootPath)) } val zkWithChroot = zkConnection + chrootPath val chrootConf = conf.clone.set(HighAvailabilityConf.HA_ZK_QUORUM, zkWithChroot) server = KyuubiServer.startServer(chrootConf) // chroot path exists after server started - ZooKeeperClientProvider.withZkClient(conf) { client => - assert(client.checkExists().forPath(chrootPath) != null) + DiscoveryClientProvider.withDiscoveryClient(conf) { client => + assert(client.pathExists(chrootPath)) } } }