diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
index 47ed53ce4b3..3ce30f4be0a 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala
@@ -50,18 +50,18 @@ abstract class ShareLevelSparkEngineSuite
}
test("check discovery service is clean up with different share level") {
- withZkClient { zkClient =>
+ withDiscoveryClient { discoveryClient =>
assert(engine.getServiceState == ServiceState.STARTED)
- assert(zkClient.checkExists().forPath(namespace) != null)
+ assert(discoveryClient.pathExists(namespace))
withJdbcStatement() { _ => }
shareLevel match {
// Connection level, we will cleanup namespace since it's always a global unique value.
case ShareLevel.CONNECTION =>
assert(engine.getServiceState == ServiceState.STOPPED)
- assert(zkClient.checkExists().forPath(namespace) == null)
+ assert(discoveryClient.pathNonExists(namespace))
case _ =>
assert(engine.getServiceState == ServiceState.STARTED)
- assert(zkClient.checkExists().forPath(namespace) != null)
+ assert(discoveryClient.pathExists(namespace))
}
}
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
index 481b8ec7e7f..fa52b8c7c89 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithDiscoverySparkSQLEngine.scala
@@ -17,12 +17,12 @@
package org.apache.kyuubi.engine.spark
-import org.apache.curator.framework.CuratorFramework
-
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_AUTH_TYPE, HA_ZK_NAMESPACE, HA_ZK_QUORUM}
-import org.apache.kyuubi.ha.client.{ZooKeeperAuthTypes, ZooKeeperClientProvider}
+import org.apache.kyuubi.ha.client.AuthTypes
+import org.apache.kyuubi.ha.client.DiscoveryClient
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
@@ -32,7 +32,7 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
assert(zkServer != null)
Map(
HA_ZK_QUORUM.key -> zkServer.getConnectString,
- HA_ZK_AUTH_TYPE.key -> ZooKeeperAuthTypes.NONE.toString,
+ HA_ZK_AUTH_TYPE.key -> AuthTypes.NONE.toString,
HA_ZK_NAMESPACE.key -> namespace)
}
@@ -62,8 +62,8 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
stopSparkEngine()
}
- def withZkClient(f: CuratorFramework => Unit): Unit = {
- ZooKeeperClientProvider.withZkClient(kyuubiConf)(f)
+ def withDiscoveryClient(f: DiscoveryClient => Unit): Unit = {
+ DiscoveryClientProvider.withDiscoveryClient(kyuubiConf)(f)
}
protected def getDiscoveryConnectionString: String = {
diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala
index 44da3cda495..2585989a269 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/ServiceControlCli.scala
@@ -19,15 +19,14 @@ package org.apache.kyuubi.ctl
import scala.collection.mutable.ListBuffer
-import org.apache.curator.framework.CuratorFramework
-import org.apache.curator.utils.ZKPaths
-
import org.apache.kyuubi.{KYUUBI_VERSION, Logging}
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN
import org.apache.kyuubi.config.KyuubiConf.ENGINE_TYPE
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider}
+import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, ServiceNodeInfo}
+import org.apache.kyuubi.ha.client.DiscoveryClient
+import org.apache.kyuubi.ha.client.DiscoveryPaths
private[ctl] object ServiceControlAction extends Enumeration {
type ServiceControlAction = Value
@@ -43,9 +42,8 @@ private[ctl] object ServiceControlObject extends Enumeration {
* Main gateway of launching a Kyuubi Ctl action.
*/
private[kyuubi] class ServiceControlCli extends Logging {
+ import DiscoveryClientProvider._
import ServiceControlCli._
- import ServiceDiscovery._
- import ZooKeeperClientProvider._
private var verbose: Boolean = false
@@ -84,21 +82,20 @@ private[kyuubi] class ServiceControlCli extends Logging {
val kyuubiConf = args.conf
kyuubiConf.setIfMissing(HA_ZK_QUORUM, args.cliArgs.zkQuorum)
- withZkClient(kyuubiConf) { zkClient =>
- val fromNamespace = ZKPaths.makePath(null, kyuubiConf.get(HA_ZK_NAMESPACE))
+ withDiscoveryClient(kyuubiConf) { discoveryClient =>
+ val fromNamespace = DiscoveryPaths.makePath(null, kyuubiConf.get(HA_ZK_NAMESPACE))
val toNamespace = getZkNamespace(args)
- val currentServerNodes = getServiceNodesInfo(zkClient, fromNamespace)
+ val currentServerNodes = discoveryClient.getServiceNodesInfo(fromNamespace)
val exposedServiceNodes = ListBuffer[ServiceNodeInfo]()
if (currentServerNodes.nonEmpty) {
- def doCreate(zc: CuratorFramework): Unit = {
+ def doCreate(zc: DiscoveryClient): Unit = {
currentServerNodes.foreach { sn =>
info(s"Exposing server instance:${sn.instance} with version:${sn.version}" +
s" from $fromNamespace to $toNamespace")
- val newNodePath = createAndGetServiceNode(
+ val newNodePath = zc.createAndGetServiceNode(
kyuubiConf,
- zc,
args.cliArgs.namespace,
sn.instance,
sn.version,
@@ -110,10 +107,10 @@ private[kyuubi] class ServiceControlCli extends Logging {
}
if (kyuubiConf.get(HA_ZK_QUORUM) == args.cliArgs.zkQuorum) {
- doCreate(zkClient)
+ doCreate(discoveryClient)
} else {
kyuubiConf.set(HA_ZK_QUORUM, args.cliArgs.zkQuorum)
- withZkClient(kyuubiConf)(doCreate)
+ withDiscoveryClient(kyuubiConf)(doCreate)
}
}
@@ -126,13 +123,13 @@ private[kyuubi] class ServiceControlCli extends Logging {
* List Kyuubi server nodes info.
*/
private def list(args: ServiceControlCliArguments, filterHostPort: Boolean): Unit = {
- withZkClient(args.conf) { zkClient =>
+ withDiscoveryClient(args.conf) { discoveryClient =>
val znodeRoot = getZkNamespace(args)
val hostPortOpt =
if (filterHostPort) {
Some((args.cliArgs.host, args.cliArgs.port.toInt))
} else None
- val nodes = getServiceNodes(zkClient, znodeRoot, hostPortOpt)
+ val nodes = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
val title = "Zookeeper service nodes"
info(renderServiceNodesInfo(title, nodes, verbose))
@@ -140,10 +137,10 @@ private[kyuubi] class ServiceControlCli extends Logging {
}
private def getServiceNodes(
- zkClient: CuratorFramework,
+ discoveryClient: DiscoveryClient,
znodeRoot: String,
hostPortOpt: Option[(String, Int)]): Seq[ServiceNodeInfo] = {
- val serviceNodes = getServiceNodesInfo(zkClient, znodeRoot)
+ val serviceNodes = discoveryClient.getServiceNodesInfo(znodeRoot)
hostPortOpt match {
case Some((host, port)) => serviceNodes.filter { sn =>
sn.host == host && sn.port == port
@@ -156,17 +153,17 @@ private[kyuubi] class ServiceControlCli extends Logging {
* Delete zookeeper service node with specified host port.
*/
private def delete(args: ServiceControlCliArguments): Unit = {
- withZkClient(args.conf) { zkClient =>
+ withDiscoveryClient(args.conf) { discoveryClient =>
val znodeRoot = getZkNamespace(args)
val hostPortOpt = Some((args.cliArgs.host, args.cliArgs.port.toInt))
- val nodesToDelete = getServiceNodes(zkClient, znodeRoot, hostPortOpt)
+ val nodesToDelete = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)
val deletedNodes = ListBuffer[ServiceNodeInfo]()
nodesToDelete.foreach { node =>
val nodePath = s"$znodeRoot/${node.nodeName}"
info(s"Deleting zookeeper service node:$nodePath")
try {
- zkClient.delete().forPath(nodePath)
+ discoveryClient.delete(nodePath)
deletedNodes += node
} catch {
case e: Exception =>
@@ -227,7 +224,7 @@ object ServiceControlCli extends CommandLineUtils with Logging {
private[ctl] def getZkNamespace(args: ServiceControlCliArguments): String = {
args.cliArgs.service match {
case ServiceControlObject.SERVER =>
- ZKPaths.makePath(null, args.cliArgs.namespace)
+ DiscoveryPaths.makePath(null, args.cliArgs.namespace)
case ServiceControlObject.ENGINE =>
val engineType = Some(args.cliArgs.engineType)
.filter(_ != null).filter(_.nonEmpty)
@@ -237,10 +234,10 @@ object ServiceControlCli extends CommandLineUtils with Logging {
.getOrElse(args.conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).getOrElse("default"))
// The path of the engine defined in zookeeper comes from
// org.apache.kyuubi.engine.EngineRef#engineSpace
- ZKPaths.makePath(
+ DiscoveryPaths.makePath(
s"${args.cliArgs.namespace}_${KYUUBI_VERSION}_${ShareLevel.USER}_${engineType}",
args.cliArgs.user,
- engineSubdomain)
+ Array(engineSubdomain))
}
}
diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala
index 5c177ddccb7..0cae9d99263 100644
--- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala
+++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ServiceControlCliSuite.scala
@@ -20,13 +20,12 @@ package org.apache.kyuubi.ctl
import java.io.{OutputStream, PrintStream}
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_NAMESPACE, HA_ZK_QUORUM}
-import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider}
+import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, ServiceNodeInfo}
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
trait TestPrematureExit {
@@ -86,9 +85,8 @@ trait TestPrematureExit {
}
class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
+ import DiscoveryClientProvider._
import ServiceControlCli._
- import ServiceDiscovery._
- import ZooKeeperClientProvider._
val zkServer = new EmbeddedZookeeper()
val conf: KyuubiConf = KyuubiConf()
@@ -226,9 +224,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
System.setProperty(HA_ZK_NAMESPACE.key, uniqueNamespace)
- withZkClient(conf) { framework =>
- createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
- createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
+ withDiscoveryClient(conf) { framework =>
+ framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
+ framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")
val newNamespace = getUniqueNamespace()
val args = Array(
@@ -245,7 +243,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false))
val znodeRoot = s"/$newNamespace"
- val children = framework.getChildren.forPath(znodeRoot).asScala.sorted
+ val children = framework.getChildren(znodeRoot).sorted
assert(children.size == 2)
assert(children.head.startsWith(
@@ -253,7 +251,7 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
assert(children.last.startsWith(
s"serviceUri=localhost:10001;version=$KYUUBI_VERSION;sequence="))
children.foreach { child =>
- framework.delete().forPath(s"""$znodeRoot/$child""")
+ framework.delete(s"""$znodeRoot/$child""")
}
}
}
@@ -290,9 +288,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
- withZkClient(conf) { framework =>
- createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
- createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
+ withDiscoveryClient(conf) { framework =>
+ framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
+ framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")
val args = Array(
"list",
@@ -319,9 +317,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
- withZkClient(conf) { framework =>
- createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
- createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
+ withDiscoveryClient(conf) { framework =>
+ framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
+ framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")
val args = Array(
"get",
@@ -351,10 +349,10 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
- withZkClient(conf) { framework =>
- withZkClient(conf) { zc =>
- createAndGetServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true)
- createAndGetServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true)
+ withDiscoveryClient(conf) { framework =>
+ withDiscoveryClient(conf) { zc =>
+ framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000", external = true)
+ framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001", external = true)
}
val args = Array(
@@ -385,9 +383,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
- withZkClient(conf) { framework =>
- createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
- createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
+ withDiscoveryClient(conf) { framework =>
+ framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
+ framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")
val args = Array(
"list",
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
index 34a337c4d66..c6bd587b39f 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala
@@ -22,7 +22,8 @@ import java.time.Duration
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf, OptionalConfigEntry}
-import org.apache.kyuubi.ha.client.{RetryPolicies, ZooKeeperAuthTypes}
+import org.apache.kyuubi.ha.client.AuthTypes
+import org.apache.kyuubi.ha.client.RetryPolicies
object HighAvailabilityConf {
@@ -51,16 +52,16 @@ object HighAvailabilityConf {
val HA_ZK_AUTH_TYPE: ConfigEntry[String] =
buildConf("kyuubi.ha.zookeeper.auth.type")
.doc("The type of zookeeper authentication, all candidates are " +
- s"${ZooKeeperAuthTypes.values.mkString("
")}")
+ s"${AuthTypes.values.mkString("")}")
.version("1.3.2")
.stringConf
- .checkValues(ZooKeeperAuthTypes.values.map(_.toString))
- .createWithDefault(ZooKeeperAuthTypes.NONE.toString)
+ .checkValues(AuthTypes.values.map(_.toString))
+ .createWithDefault(AuthTypes.NONE.toString)
val HA_ZK_ENGINE_AUTH_TYPE: ConfigEntry[String] =
buildConf("kyuubi.ha.zookeeper.engine.auth.type")
.doc("The type of zookeeper authentication for engine, all candidates are " +
- s"${ZooKeeperAuthTypes.values.mkString("")}")
+ s"${AuthTypes.values.mkString("")}")
.version("1.3.2")
.fallbackConf(HA_ZK_AUTH_TYPE)
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/AuthTypes.scala
similarity index 91%
rename from kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala
rename to kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/AuthTypes.scala
index da28db730b0..0c02764fb9c 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperAuthTypes.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/AuthTypes.scala
@@ -17,9 +17,9 @@
package org.apache.kyuubi.ha.client
-object ZooKeeperAuthTypes extends Enumeration {
+object AuthTypes extends Enumeration {
- type ZooKeeperAuthType = Value
+ type AuthTypes = Value
val NONE, KERBEROS, DIGEST = Value
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
new file mode 100644
index 00000000000..1a390d41835
--- /dev/null
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+
+/**
+ * A collection of apis that discovery client need implement.
+ */
+trait DiscoveryClient extends Logging {
+
+ /**
+ * Create a discovery client.
+ */
+ def createClient(): Unit
+
+ /**
+ * Close the discovery client.
+ */
+ def closeClient(): Unit
+
+ /**
+ * Create path on discovery service.
+ */
+ def create(path: String, mode: String, createParent: Boolean = true): String
+
+ /**
+ * Get the stored data under path.
+ */
+ def getData(path: String): Array[Byte]
+
+ /**
+ * Get the paths under given path.
+ * @return list of path
+ */
+ def getChildren(path: String): List[String]
+
+ /**
+ * Check if the path is exists.
+ */
+ def pathExists(path: String): Boolean
+
+ /**
+ * Check if the path non exists.
+ */
+ def pathNonExists(path: String): Boolean
+
+ /**
+ * Delete a path.
+ * @param path the path to be deleted
+ * @param deleteChildren if true, will also delete children if they exist.
+ */
+ def delete(path: String, deleteChildren: Boolean = false): Unit
+
+ /**
+ * Add a monitor for serviceDiscovery. It is used to stop service discovery gracefully
+ * when disconnect.
+ */
+ def monitorState(serviceDiscovery: ServiceDiscovery): Unit
+
+ /**
+ * The distributed lock path used to ensure only once engine being created for non-CONNECTION
+ * share level.
+ */
+ def tryWithLock[T](
+ lockPath: String,
+ timeout: Long,
+ unit: TimeUnit = TimeUnit.MILLISECONDS)(f: => T): T
+
+ /**
+ * Get the engine address and port from engine space.
+ * @return engine host and port
+ */
+ def getServerHost(namespace: String): Option[(String, Int)]
+
+ /**
+ * Get engine info by engine ref id from engine space.
+ * @param namespace the path to get engine ref
+ * @param engineRefId engine ref id
+ * @return engine host and port
+ */
+ def getEngineByRefId(
+ namespace: String,
+ engineRefId: String): Option[(String, Int)]
+
+ /**
+ * Get service node info from server space.
+ * @param namespace the path to get node info
+ * @param sizeOpt how many nodes to pick
+ * @param silent if true, error message will not be logged
+ * @return Service node info
+ */
+ def getServiceNodesInfo(
+ namespace: String,
+ sizeOpt: Option[Int] = None,
+ silent: Boolean = false): Seq[ServiceNodeInfo]
+
+ /**
+ * Register Kyuubi instance on discovery service.
+ * @param conf Kyuubi config
+ * @param namespace the path to register instance
+ * @param serviceDiscovery service discovery
+ * @param version kyuubi version
+ * @param external if true,
+ * the service info will not be automatically deleted upon client's disconnect
+ */
+ def registerService(
+ conf: KyuubiConf,
+ namespace: String,
+ serviceDiscovery: ServiceDiscovery,
+ version: Option[String] = None,
+ external: Boolean = false): Unit
+
+ /**
+ * Deregister Kyuubi instance on discovery service.
+ */
+ def deregisterService(): Unit
+
+ /**
+ * Request remove Kyuubi instance on discovery service.
+ */
+ def postDeregisterService(namespace: String): Boolean
+
+ /**
+ * Create server service node info on discovery and get the actual path.
+ * @param conf Kyuubi config
+ * @param namespace the path to register instance
+ * @param instance server info, host:port
+ * @param version kyuubi version
+ * @param external if true,
+ * the service info will not be automatically deleted upon client's disconnect
+ */
+ def createAndGetServiceNode(
+ conf: KyuubiConf,
+ namespace: String,
+ instance: String,
+ version: Option[String] = None,
+ external: Boolean = false): String
+
+ /**
+ * Create a node to store engine secret.
+ * @param createMode create node mode, automatically deleted or not
+ * @param basePath the base path for the node
+ * @param initData the init data to be stored
+ * @param useProtection if true, createBuilder with protection
+ */
+ def startSecretNode(
+ createMode: String,
+ basePath: String,
+ initData: String,
+ useProtection: Boolean = false): Unit
+}
+
+object DiscoveryClient {
+
+ /**
+ * Parse instance info string, get host and port.
+ */
+ private[client] def parseInstanceHostPort(instance: String): (String, Int) = {
+ val maybeInfos = instance.split(";")
+ .map(_.split("=", 2))
+ .filter(_.size == 2)
+ .map(i => (i(0), i(1)))
+ .toMap
+ if (maybeInfos.size > 0) {
+ (
+ maybeInfos.get("hive.server2.thrift.bind.host").get,
+ maybeInfos.get("hive.server2.thrift.port").get.toInt)
+ } else {
+ val strings = instance.split(":")
+ (strings(0), strings(1).toInt)
+ }
+ }
+}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClientProvider.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClientProvider.scala
new file mode 100644
index 00000000000..be908e2fc70
--- /dev/null
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClientProvider.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client
+
+import java.io.IOException
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClient
+
+object DiscoveryClientProvider extends Logging {
+
+ /**
+ * Creates a zookeeper client before calling `f` and close it after calling `f`.
+ */
+ def withDiscoveryClient[T](conf: KyuubiConf)(f: DiscoveryClient => T): T = {
+ val discoveryClient = new ZookeeperDiscoveryClient(conf)
+ try {
+ discoveryClient.createClient()
+ f(discoveryClient)
+ } finally {
+ try {
+ discoveryClient.closeClient()
+ } catch {
+ case e: IOException => error("Failed to release the zkClient", e)
+ }
+ }
+ }
+
+}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryPaths.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryPaths.scala
new file mode 100644
index 00000000000..4116fd6cf58
--- /dev/null
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryPaths.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client
+
+import org.apache.curator.utils.ZKPaths
+
+object DiscoveryPaths {
+ def makePath(parent: String, firstChild: String): String = {
+ ZKPaths.makePath(parent, firstChild)
+ }
+
+ def makePath(parent: String, firstChild: String, restChildren: Array[String]): String = {
+ ZKPaths.makePath(parent, firstChild, restChildren: _*)
+ }
+}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
index 98127edf340..88388f3ba8a 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/EngineServiceDiscovery.scala
@@ -37,7 +37,7 @@ class EngineServiceDiscovery(
// For connection level, we should clean up the namespace in zk in case the disk stress.
case "CONNECTION" =>
try {
- if (discoveryClient.postDeregisterService) {
+ if (discoveryClient.postDeregisterService(namespace)) {
info("Clean up discovery service due to this is connection share level.")
}
} catch {
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index 1723561a461..c65a4aa91da 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -17,20 +17,12 @@
package org.apache.kyuubi.ha.client
-import java.nio.charset.StandardCharsets
import java.util.concurrent.atomic.AtomicBoolean
-import scala.collection.JavaConverters._
-
-import com.google.common.annotations.VisibleForTesting
-import org.apache.curator.framework.CuratorFramework
-import org.apache.curator.utils.ZKPaths
-
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient
-import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.createServiceNode
+import org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClient
import org.apache.kyuubi.service.{AbstractService, FrontendService}
/**
@@ -45,21 +37,28 @@ abstract class ServiceDiscovery(
protected val isServerLost = new AtomicBoolean(false)
- private var _discoveryClient: ServiceDiscoveryClient = _
+ /**
+ * a pre-defined namespace used to publish the instance of the associate service
+ */
+ private var _namespace: String = _
+ private var _discoveryClient: DiscoveryClient = _
- def discoveryClient: ServiceDiscoveryClient = _discoveryClient
+ def namespace: String = _namespace
+ def discoveryClient: DiscoveryClient = _discoveryClient
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
- _discoveryClient = new ServiceDiscoveryClient(this)
- discoveryClient.createClient(conf)
+ _namespace = conf.get(HA_ZK_NAMESPACE)
+ _discoveryClient = new ZookeeperDiscoveryClient(conf)
+ discoveryClient.monitorState(this)
+ discoveryClient.createClient()
super.initialize(conf)
}
override def start(): Unit = {
- discoveryClient.registerService(conf)
+ discoveryClient.registerService(conf, namespace, this)
super.start()
}
@@ -80,74 +79,4 @@ object ServiceDiscovery extends Logging {
val zkEnsemble = conf.get(HA_ZK_QUORUM)
zkEnsemble != null && zkEnsemble.nonEmpty
}
-
- def getServerHost(zkClient: CuratorFramework, namespace: String): Option[(String, Int)] = {
- // TODO: use last one because to avoid touching some maybe-crashed engines
- // We need a big improvement here.
- getServiceNodesInfo(zkClient, namespace, Some(1), silent = true) match {
- case Seq(sn) => Some((sn.host, sn.port))
- case _ => None
- }
- }
-
- def getEngineByRefId(
- zkClient: CuratorFramework,
- namespace: String,
- engineRefId: String): Option[(String, Int)] = {
- getServiceNodesInfo(zkClient, namespace, silent = true)
- .find(_.engineRefId.exists(_.equals(engineRefId)))
- .map(data => (data.host, data.port))
- }
-
- def getServiceNodesInfo(
- zkClient: CuratorFramework,
- namespace: String,
- sizeOpt: Option[Int] = None,
- silent: Boolean = false): Seq[ServiceNodeInfo] = {
- try {
- val hosts = zkClient.getChildren.forPath(namespace)
- val size = sizeOpt.getOrElse(hosts.size())
- hosts.asScala.takeRight(size).map { p =>
- val path = ZKPaths.makePath(namespace, p)
- val instance = new String(zkClient.getData.forPath(path), StandardCharsets.UTF_8)
- val (host, port) = parseInstanceHostPort(instance)
- val version = p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version="))
- val engineRefId = p.split(";").find(_.startsWith("refId=")).map(_.stripPrefix("refId="))
- info(s"Get service instance:$instance and version:$version under $namespace")
- ServiceNodeInfo(namespace, p, host, port, version, engineRefId)
- }
- } catch {
- case _: Exception if silent => Nil
- case e: Exception =>
- error(s"Failed to get service node info", e)
- Nil
- }
- }
-
- @VisibleForTesting
- private[client] def parseInstanceHostPort(instance: String): (String, Int) = {
- val maybeInfos = instance.split(";")
- .map(_.split("=", 2))
- .filter(_.size == 2)
- .map(i => (i(0), i(1)))
- .toMap
- if (maybeInfos.size > 0) {
- (
- maybeInfos.get("hive.server2.thrift.bind.host").get,
- maybeInfos.get("hive.server2.thrift.port").get.toInt)
- } else {
- val strings = instance.split(":")
- (strings(0), strings(1).toInt)
- }
- }
-
- def createAndGetServiceNode(
- conf: KyuubiConf,
- zkClient: CuratorFramework,
- namespace: String,
- instance: String,
- version: Option[String] = None,
- external: Boolean = false): String = {
- createServiceNode(conf, zkClient, namespace, instance, version, external).getActualPath
- }
}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
deleted file mode 100644
index 98efe239974..00000000000
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ServiceDiscoveryClient.scala
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.ha.client.zookeeper
-
-import java.io.IOException
-import java.nio.charset.StandardCharsets
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
-
-import org.apache.curator.framework.CuratorFramework
-import org.apache.curator.framework.recipes.nodes.PersistentNode
-import org.apache.curator.framework.state.ConnectionState
-import org.apache.curator.framework.state.ConnectionState.CONNECTED
-import org.apache.curator.framework.state.ConnectionState.LOST
-import org.apache.curator.framework.state.ConnectionState.RECONNECTED
-import org.apache.curator.framework.state.ConnectionStateListener
-import org.apache.curator.utils.ZKPaths
-import org.apache.zookeeper.CreateMode
-import org.apache.zookeeper.CreateMode.PERSISTENT
-import org.apache.zookeeper.KeeperException
-import org.apache.zookeeper.KeeperException.NodeExistsException
-import org.apache.zookeeper.WatchedEvent
-import org.apache.zookeeper.Watcher
-
-import org.apache.kyuubi.KYUUBI_VERSION
-import org.apache.kyuubi.KyuubiException
-import org.apache.kyuubi.Logging
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NODE_TIMEOUT
-import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_PUBLISH_CONFIGS
-import org.apache.kyuubi.ha.client.ServiceDiscovery
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider.{buildZookeeperClient, getGracefulStopThreadDelay}
-import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.connectionChecker
-import org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.createServiceNode
-import org.apache.kyuubi.util.KyuubiHadoopUtils
-import org.apache.kyuubi.util.ThreadUtils
-
-class ServiceDiscoveryClient(serviceDiscovery: ServiceDiscovery) extends Logging {
-
- /**
- * a pre-defined namespace used to publish the instance of the associate service
- */
- protected var _namespace: String = _
-
- private lazy val instance: String = serviceDiscovery.fe.connectionUrl
- private var zkClient: CuratorFramework = _
- private var serviceNode: PersistentNode = _
-
- def namespace: String = _namespace
-
- def createClient(conf: KyuubiConf): Unit = {
- _namespace = conf.get(HA_ZK_NAMESPACE)
- zkClient = buildZookeeperClient(conf)
- zkClient.getConnectionStateListenable.addListener(new ConnectionStateListener {
- private val isConnected = new AtomicBoolean(false)
-
- override def stateChanged(client: CuratorFramework, newState: ConnectionState): Unit = {
- info(s"Zookeeper client connection state changed to: $newState")
- newState match {
- case CONNECTED | RECONNECTED => isConnected.set(true)
- case LOST =>
- isConnected.set(false)
- val delay = getGracefulStopThreadDelay(conf)
- connectionChecker.schedule(
- new Runnable {
- override def run(): Unit = if (!isConnected.get()) {
- error(s"Zookeeper client connection state changed to: $newState, but failed to" +
- s" reconnect in ${delay / 1000} seconds. Give up retry and stop gracefully . ")
- serviceDiscovery.stopGracefully(true)
- }
- },
- delay,
- TimeUnit.MILLISECONDS)
- case _ =>
- }
- }
- })
- zkClient.start()
- }
-
- def registerService(conf: KyuubiConf): Unit = {
- serviceNode = createServiceNode(conf, zkClient, namespace, instance)
- // Set a watch on the serviceNode
- val watcher = new DeRegisterWatcher
- if (zkClient.checkExists.usingWatcher(watcher).forPath(serviceNode.getActualPath) == null) {
- // No node exists, throw exception
- throw new KyuubiException(s"Unable to create znode for this Kyuubi " +
- s"instance[${instance}] on ZooKeeper.")
- }
- }
-
- /**
- * Close the serviceNode if not closed yet
- * and the znode will be deleted upon the serviceNode closed.
- */
- def deregisterService(): Unit = {
- // close the EPHEMERAL_SEQUENTIAL node in zk
- if (serviceNode != null) {
- try {
- serviceNode.close()
- } catch {
- case e @ (_: IOException | _: KeeperException) =>
- error("Failed to close the persistent ephemeral znode" + serviceNode.getActualPath, e)
- } finally {
- serviceNode = null
- }
- }
- }
-
- def postDeregisterService(): Boolean = {
- if (namespace != null) {
- try {
- zkClient.delete().deletingChildrenIfNeeded().forPath(namespace)
- true
- } catch {
- case e: KeeperException =>
- warn(s"Failed to delete $namespace", e)
- false
- }
- } else {
- false
- }
- }
-
- def closeClient(): Unit = {
- if (zkClient != null) zkClient.close()
- }
-
- class DeRegisterWatcher extends Watcher {
- override def process(event: WatchedEvent): Unit = {
- if (event.getType == Watcher.Event.EventType.NodeDeleted) {
- warn(s"This Kyuubi instance ${instance} is now de-registered from" +
- s" ZooKeeper. The server will be shut down after the last client session completes.")
- serviceDiscovery.stopGracefully()
- }
- }
- }
-}
-
-object ServiceDiscoveryClient extends Logging {
- final private lazy val connectionChecker =
- ThreadUtils.newDaemonSingleThreadScheduledExecutor("zk-connection-checker")
-
- private[client] def createServiceNode(
- conf: KyuubiConf,
- zkClient: CuratorFramework,
- namespace: String,
- instance: String,
- version: Option[String] = None,
- external: Boolean = false): PersistentNode = {
- val ns = ZKPaths.makePath(null, namespace)
- try {
- zkClient
- .create()
- .creatingParentsIfNeeded()
- .withMode(PERSISTENT)
- .forPath(ns)
- } catch {
- case _: NodeExistsException => // do nothing
- case e: KeeperException =>
- throw new KyuubiException(s"Failed to create namespace '$ns'", e)
- }
-
- val session = conf.get(HA_ZK_ENGINE_REF_ID)
- .map(refId => s"refId=$refId;").getOrElse("")
- val pathPrefix = ZKPaths.makePath(
- namespace,
- s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=")
- var serviceNode: PersistentNode = null
- val createMode =
- if (external) CreateMode.PERSISTENT_SEQUENTIAL
- else CreateMode.EPHEMERAL_SEQUENTIAL
- val znodeData =
- if (conf.get(HA_ZK_PUBLISH_CONFIGS) && session.isEmpty) {
- addConfsToPublish(conf, instance)
- } else {
- instance
- }
- try {
- serviceNode = new PersistentNode(
- zkClient,
- createMode,
- false,
- pathPrefix,
- znodeData.getBytes(StandardCharsets.UTF_8))
- serviceNode.start()
- val znodeTimeout = conf.get(HA_ZK_NODE_TIMEOUT)
- if (!serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit.MILLISECONDS)) {
- throw new KyuubiException(s"Max znode creation wait time $znodeTimeout s exhausted")
- }
- info(s"Created a ${serviceNode.getActualPath} on ZooKeeper for KyuubiServer uri: " + instance)
- } catch {
- case e: Exception =>
- if (serviceNode != null) {
- serviceNode.close()
- }
- throw new KyuubiException(
- s"Unable to create a znode for this server instance: $instance",
- e)
- }
- serviceNode
- }
-
- /**
- * Refer to the implementation of HIVE-11581 to simplify user connection parameters.
- * https://issues.apache.org/jira/browse/HIVE-11581
- * HiveServer2 should store connection params in ZK
- * when using dynamic service discovery for simpler client connection string.
- */
- private[client] def addConfsToPublish(conf: KyuubiConf, instance: String): String = {
- if (!instance.contains(":")) {
- return instance
- }
- val hostPort = instance.split(":", 2)
- val confsToPublish = collection.mutable.Map[String, String]()
-
- // Hostname
- confsToPublish += ("hive.server2.thrift.bind.host" -> hostPort(0))
- // Transport mode
- confsToPublish += ("hive.server2.transport.mode" -> "binary")
- // Transport specific confs
- confsToPublish += ("hive.server2.thrift.port" -> hostPort(1))
- confsToPublish += ("hive.server2.thrift.sasl.qop" -> conf.get(KyuubiConf.SASL_QOP))
- // Auth specific confs
- val authenticationMethod = conf.get(KyuubiConf.AUTHENTICATION_METHOD).mkString(",")
- confsToPublish += ("hive.server2.authentication" -> authenticationMethod)
- if (authenticationMethod.equalsIgnoreCase("KERBEROS")) {
- confsToPublish += ("hive.server2.authentication.kerberos.principal" ->
- conf.get(KyuubiConf.SERVER_PRINCIPAL).map(KyuubiHadoopUtils.getServerPrincipal)
- .getOrElse(""))
- }
- confsToPublish.map { case (k, v) => k + "=" + v }.mkString(";")
- }
-}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperACLProvider.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperACLProvider.scala
similarity index 86%
rename from kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperACLProvider.scala
rename to kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperACLProvider.scala
index 654cb8f8da5..40ead463304 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperACLProvider.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperACLProvider.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kyuubi.ha.client
+package org.apache.kyuubi.ha.client.zookeeper
import org.apache.curator.framework.api.ACLProvider
import org.apache.zookeeper.ZooDefs
@@ -23,8 +23,9 @@ import org.apache.zookeeper.data.ACL
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf
+import org.apache.kyuubi.ha.client.AuthTypes
-class ZooKeeperACLProvider(conf: KyuubiConf) extends ACLProvider {
+class ZookeeperACLProvider(conf: KyuubiConf) extends ACLProvider {
/**
* Return the ACL list to use by default.
@@ -53,15 +54,15 @@ class ZooKeeperACLProvider(conf: KyuubiConf) extends ACLProvider {
nodeAcls
}
- private def enabledServerAcls(): Boolean = ZooKeeperAuthTypes
+ private def enabledServerAcls(): Boolean = AuthTypes
.withName(conf.get(HighAvailabilityConf.HA_ZK_AUTH_TYPE)) match {
- case ZooKeeperAuthTypes.NONE => false
+ case AuthTypes.NONE => false
case _ => true
}
- private def enabledEngineAcls(): Boolean = ZooKeeperAuthTypes
+ private def enabledEngineAcls(): Boolean = AuthTypes
.withName(conf.get(HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE)) match {
- case ZooKeeperAuthTypes.NONE => false
+ case AuthTypes.NONE => false
case _ => true
}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProvider.scala
similarity index 85%
rename from kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala
rename to kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProvider.scala
index fa7d366edb1..31bf33837f4 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProvider.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kyuubi.ha.client
+package org.apache.kyuubi.ha.client.zookeeper
import java.io.{File, IOException}
import javax.security.auth.login.Configuration
@@ -30,15 +30,16 @@ import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManage
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
+import org.apache.kyuubi.ha.client.AuthTypes
+import org.apache.kyuubi.ha.client.RetryPolicies
+import org.apache.kyuubi.ha.client.RetryPolicies._
import org.apache.kyuubi.util.KyuubiHadoopUtils
-object ZooKeeperClientProvider extends Logging {
-
- import RetryPolicies._
+object ZookeeperClientProvider extends Logging {
/**
* Create a [[CuratorFramework]] instance to be used as the ZooKeeper client
- * Use the [[ZooKeeperACLProvider]] to create appropriate ACLs
+ * Use the [[ZookeeperACLProvider]] to create appropriate ACLs
*/
def buildZookeeperClient(conf: KyuubiConf): CuratorFramework = {
setUpZooKeeperAuth(conf)
@@ -61,7 +62,7 @@ object ZooKeeperClientProvider extends Logging {
.connectString(connectionStr)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
- .aclProvider(new ZooKeeperACLProvider(conf))
+ .aclProvider(new ZookeeperACLProvider(conf))
.retryPolicy(retryPolicy)
conf.get(HA_ZK_AUTH_DIGEST) match {
@@ -94,23 +95,6 @@ object ZooKeeperClientProvider extends Logging {
}
}
- /**
- * Creates a zookeeper client before calling `f` and close it after calling `f`.
- */
- def withZkClient[T](conf: KyuubiConf)(f: CuratorFramework => T): T = {
- val zkClient = buildZookeeperClient(conf)
- try {
- zkClient.start()
- f(zkClient)
- } finally {
- try {
- zkClient.close()
- } catch {
- case e: IOException => error("Failed to release the zkClient", e)
- }
- }
- }
-
/**
* For a kerberized cluster, we dynamically set up the client's JAAS conf.
*
@@ -136,10 +120,10 @@ object ZooKeeperClientProvider extends Logging {
}
if (conf.get(HA_ZK_ENGINE_REF_ID).isEmpty
- && ZooKeeperAuthTypes.withName(conf.get(HA_ZK_AUTH_TYPE)) == ZooKeeperAuthTypes.KERBEROS) {
+ && AuthTypes.withName(conf.get(HA_ZK_AUTH_TYPE)) == AuthTypes.KERBEROS) {
setupZkAuth()
- } else if (conf.get(HA_ZK_ENGINE_REF_ID).nonEmpty && ZooKeeperAuthTypes
- .withName(conf.get(HA_ZK_ENGINE_AUTH_TYPE)) == ZooKeeperAuthTypes.KERBEROS) {
+ } else if (conf.get(HA_ZK_ENGINE_REF_ID).nonEmpty && AuthTypes
+ .withName(conf.get(HA_ZK_ENGINE_AUTH_TYPE)) == AuthTypes.KERBEROS) {
setupZkAuth()
}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
new file mode 100644
index 00000000000..cb1c5294582
--- /dev/null
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client.zookeeper
+
+import java.io.IOException
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import com.google.common.annotations.VisibleForTesting
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
+import org.apache.curator.framework.recipes.nodes.PersistentNode
+import org.apache.curator.framework.state.ConnectionState
+import org.apache.curator.framework.state.ConnectionState.CONNECTED
+import org.apache.curator.framework.state.ConnectionState.LOST
+import org.apache.curator.framework.state.ConnectionState.RECONNECTED
+import org.apache.curator.framework.state.ConnectionStateListener
+import org.apache.curator.utils.ZKPaths
+import org.apache.zookeeper.CreateMode
+import org.apache.zookeeper.CreateMode.PERSISTENT
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.zookeeper.WatchedEvent
+import org.apache.zookeeper.Watcher
+
+import org.apache.kyuubi.KYUUBI_VERSION
+import org.apache.kyuubi.KyuubiException
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NODE_TIMEOUT
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_PUBLISH_CONFIGS
+import org.apache.kyuubi.ha.client.DiscoveryClient
+import org.apache.kyuubi.ha.client.ServiceDiscovery
+import org.apache.kyuubi.ha.client.ServiceNodeInfo
+import org.apache.kyuubi.ha.client.zookeeper.ZookeeperClientProvider.buildZookeeperClient
+import org.apache.kyuubi.ha.client.zookeeper.ZookeeperClientProvider.getGracefulStopThreadDelay
+import org.apache.kyuubi.ha.client.zookeeper.ZookeeperDiscoveryClient.connectionChecker
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+import org.apache.kyuubi.util.ThreadUtils
+
+class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
+
+ private val zkClient: CuratorFramework = buildZookeeperClient(conf)
+ private var serviceNode: PersistentNode = _
+
+ def createClient(): Unit = {
+ zkClient.start()
+ }
+
+ def closeClient(): Unit = {
+ if (zkClient != null) {
+ zkClient.close()
+ }
+ }
+
+ def create(path: String, mode: String, createParent: Boolean = true): String = {
+ val builder =
+ if (createParent) zkClient.create().creatingParentsIfNeeded() else zkClient.create()
+ builder
+ .withMode(CreateMode.valueOf(mode))
+ .forPath(path)
+ }
+
+ def getData(path: String): Array[Byte] = {
+ zkClient.getData.forPath(path)
+ }
+
+ def getChildren(path: String): List[String] = {
+ zkClient.getChildren.forPath(path).asScala.toList
+ }
+
+ def pathExists(path: String): Boolean = {
+ zkClient.checkExists().forPath(path) != null
+ }
+
+ def pathNonExists(path: String): Boolean = {
+ zkClient.checkExists().forPath(path) == null
+ }
+
+ def delete(path: String, deleteChildren: Boolean = false): Unit = {
+ if (deleteChildren) {
+ zkClient.delete().deletingChildrenIfNeeded().forPath(path)
+ } else {
+ zkClient.delete().forPath(path)
+ }
+ }
+
+ def monitorState(serviceDiscovery: ServiceDiscovery): Unit = {
+ zkClient
+ .getConnectionStateListenable.addListener(new ConnectionStateListener {
+ private val isConnected = new AtomicBoolean(false)
+
+ override def stateChanged(client: CuratorFramework, newState: ConnectionState): Unit = {
+ info(s"Zookeeper client connection state changed to: $newState")
+ newState match {
+ case CONNECTED | RECONNECTED => isConnected.set(true)
+ case LOST =>
+ isConnected.set(false)
+ val delay = getGracefulStopThreadDelay(conf)
+ connectionChecker.schedule(
+ new Runnable {
+ override def run(): Unit = if (!isConnected.get()) {
+ error(s"Zookeeper client connection state changed to: $newState," +
+ s" but failed to reconnect in ${delay / 1000} seconds." +
+ s" Give up retry and stop gracefully . ")
+ serviceDiscovery.stopGracefully(true)
+ }
+ },
+ delay,
+ TimeUnit.MILLISECONDS)
+ case _ =>
+ }
+ }
+ })
+ }
+
+ def tryWithLock[T](
+ lockPath: String,
+ timeout: Long,
+ unit: TimeUnit = TimeUnit.MILLISECONDS)(f: => T): T = {
+ var lock: InterProcessSemaphoreMutex = null
+ try {
+ try {
+ lock = new InterProcessSemaphoreMutex(zkClient, lockPath)
+ // Acquire a lease. If no leases are available, this method blocks until either the
+ // maximum number of leases is increased or another client/process closes a lease
+ lock.acquire(timeout, unit)
+ } catch {
+ case e: Exception => throw KyuubiSQLException(s"Lock failed on path [$lockPath]", e)
+ }
+ f
+ } finally {
+ try {
+ if (lock != null) {
+ lock.release()
+ }
+ } catch {
+ case _: Exception =>
+ }
+ }
+ }
+
+ def getServerHost(namespace: String): Option[(String, Int)] = {
+ // TODO: use last one because to avoid touching some maybe-crashed engines
+ // We need a big improvement here.
+ getServiceNodesInfo(namespace, Some(1), silent = true) match {
+ case Seq(sn) => Some((sn.host, sn.port))
+ case _ => None
+ }
+ }
+
+ def getEngineByRefId(
+ namespace: String,
+ engineRefId: String): Option[(String, Int)] = {
+ getServiceNodesInfo(namespace, silent = true)
+ .find(_.engineRefId.exists(_.equals(engineRefId)))
+ .map(data => (data.host, data.port))
+ }
+
+ def getServiceNodesInfo(
+ namespace: String,
+ sizeOpt: Option[Int] = None,
+ silent: Boolean = false): Seq[ServiceNodeInfo] = {
+ try {
+ val hosts = zkClient.getChildren.forPath(namespace)
+ val size = sizeOpt.getOrElse(hosts.size())
+ hosts.asScala.takeRight(size).map { p =>
+ val path = ZKPaths.makePath(namespace, p)
+ val instance = new String(zkClient.getData.forPath(path), StandardCharsets.UTF_8)
+ val (host, port) = DiscoveryClient.parseInstanceHostPort(instance)
+ val version = p.split(";").find(_.startsWith("version=")).map(_.stripPrefix("version="))
+ val engineRefId = p.split(";").find(_.startsWith("refId=")).map(_.stripPrefix("refId="))
+ info(s"Get service instance:$instance and version:$version under $namespace")
+ ServiceNodeInfo(namespace, p, host, port, version, engineRefId)
+ }
+ } catch {
+ case _: Exception if silent => Nil
+ case e: Exception =>
+ error(s"Failed to get service node info", e)
+ Nil
+ }
+ }
+
+ def registerService(
+ conf: KyuubiConf,
+ namespace: String,
+ serviceDiscovery: ServiceDiscovery,
+ version: Option[String] = None,
+ external: Boolean = false): Unit = {
+ val instance = serviceDiscovery.fe.connectionUrl
+ val watcher = new DeRegisterWatcher(instance, serviceDiscovery)
+ serviceNode = createPersistentNode(conf, namespace, instance, version, external)
+ // Set a watch on the serviceNode
+ if (zkClient.checkExists
+ .usingWatcher(watcher.asInstanceOf[Watcher]).forPath(serviceNode.getActualPath) == null) {
+ // No node exists, throw exception
+ throw new KyuubiException(s"Unable to create znode for this Kyuubi " +
+ s"instance[${instance}] on ZooKeeper.")
+ }
+ }
+
+ def deregisterService(): Unit = {
+ // close the EPHEMERAL_SEQUENTIAL node in zk
+ if (serviceNode != null) {
+ try {
+ serviceNode.close()
+ } catch {
+ case e @ (_: IOException | _: KeeperException) =>
+ error("Failed to close the persistent ephemeral znode" + serviceNode.getActualPath, e)
+ } finally {
+ serviceNode = null
+ }
+ }
+ }
+
+ def postDeregisterService(namespace: String): Boolean = {
+ if (namespace != null) {
+ try {
+ delete(namespace, true)
+ true
+ } catch {
+ case e: KeeperException =>
+ warn(s"Failed to delete $namespace", e)
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ def createAndGetServiceNode(
+ conf: KyuubiConf,
+ namespace: String,
+ instance: String,
+ version: Option[String] = None,
+ external: Boolean = false): String = {
+ createPersistentNode(conf, namespace, instance, version, external).getActualPath
+ }
+
+ @VisibleForTesting
+ def startSecretNode(
+ createMode: String,
+ basePath: String,
+ initData: String,
+ useProtection: Boolean = false): Unit = {
+ val secretNode = new PersistentNode(
+ zkClient,
+ CreateMode.valueOf(createMode),
+ useProtection,
+ basePath,
+ initData.getBytes(StandardCharsets.UTF_8))
+ secretNode.start()
+ }
+
+ /**
+ * Refer to the implementation of HIVE-11581 to simplify user connection parameters.
+ * https://issues.apache.org/jira/browse/HIVE-11581
+ * HiveServer2 should store connection params in ZK
+ * when using dynamic service discovery for simpler client connection string.
+ */
+ private[client] def addConfsToPublish(conf: KyuubiConf, instance: String): String = {
+ if (!instance.contains(":")) {
+ return instance
+ }
+ val hostPort = instance.split(":", 2)
+ val confsToPublish = collection.mutable.Map[String, String]()
+
+ // Hostname
+ confsToPublish += ("hive.server2.thrift.bind.host" -> hostPort(0))
+ // Transport mode
+ confsToPublish += ("hive.server2.transport.mode" -> "binary")
+ // Transport specific confs
+ confsToPublish += ("hive.server2.thrift.port" -> hostPort(1))
+ confsToPublish += ("hive.server2.thrift.sasl.qop" -> conf.get(KyuubiConf.SASL_QOP))
+ // Auth specific confs
+ val authenticationMethod = conf.get(KyuubiConf.AUTHENTICATION_METHOD).mkString(",")
+ confsToPublish += ("hive.server2.authentication" -> authenticationMethod)
+ if (authenticationMethod.equalsIgnoreCase("KERBEROS")) {
+ confsToPublish += ("hive.server2.authentication.kerberos.principal" ->
+ conf.get(KyuubiConf.SERVER_PRINCIPAL).map(KyuubiHadoopUtils.getServerPrincipal)
+ .getOrElse(""))
+ }
+ confsToPublish.map { case (k, v) => k + "=" + v }.mkString(";")
+ }
+
+ private def createPersistentNode(
+ conf: KyuubiConf,
+ namespace: String,
+ instance: String,
+ version: Option[String] = None,
+ external: Boolean = false): PersistentNode = {
+ val ns = ZKPaths.makePath(null, namespace)
+ try {
+ zkClient
+ .create()
+ .creatingParentsIfNeeded()
+ .withMode(PERSISTENT)
+ .forPath(ns)
+ } catch {
+ case _: NodeExistsException => // do nothing
+ case e: KeeperException =>
+ throw new KyuubiException(s"Failed to create namespace '$ns'", e)
+ }
+
+ val session = conf.get(HA_ZK_ENGINE_REF_ID)
+ .map(refId => s"refId=$refId;").getOrElse("")
+ val pathPrefix = ZKPaths.makePath(
+ namespace,
+ s"serviceUri=$instance;version=${version.getOrElse(KYUUBI_VERSION)};${session}sequence=")
+ var localServiceNode: PersistentNode = null
+ val createMode =
+ if (external) CreateMode.PERSISTENT_SEQUENTIAL
+ else CreateMode.EPHEMERAL_SEQUENTIAL
+ val znodeData =
+ if (conf.get(HA_ZK_PUBLISH_CONFIGS) && session.isEmpty) {
+ addConfsToPublish(conf, instance)
+ } else {
+ instance
+ }
+ try {
+ localServiceNode = new PersistentNode(
+ zkClient,
+ createMode,
+ false,
+ pathPrefix,
+ znodeData.getBytes(StandardCharsets.UTF_8))
+ localServiceNode.start()
+ val znodeTimeout = conf.get(HA_ZK_NODE_TIMEOUT)
+ if (!localServiceNode.waitForInitialCreate(znodeTimeout, TimeUnit.MILLISECONDS)) {
+ throw new KyuubiException(s"Max znode creation wait time $znodeTimeout s exhausted")
+ }
+ info(s"Created a ${localServiceNode.getActualPath} on ZooKeeper for KyuubiServer uri:" +
+ s" $instance")
+ } catch {
+ case e: Exception =>
+ if (localServiceNode != null) {
+ localServiceNode.close()
+ }
+ throw new KyuubiException(
+ s"Unable to create a znode for this server instance: $instance",
+ e)
+ }
+ localServiceNode
+ }
+
+ class DeRegisterWatcher(instance: String, serviceDiscovery: ServiceDiscovery) extends Watcher {
+ override def process(event: WatchedEvent): Unit = {
+ if (event.getType == Watcher.Event.EventType.NodeDeleted) {
+ warn(s"This Kyuubi instance ${instance} is now de-registered from" +
+ s" ZooKeeper. The server will be shut down after the last client session completes.")
+ serviceDiscovery.stopGracefully()
+ }
+ }
+ }
+}
+
+object ZookeeperDiscoveryClient extends Logging {
+ final private lazy val connectionChecker =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("zk-connection-checker")
+}
diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala
index 03730795160..75af21e3aed 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala
@@ -21,10 +21,10 @@ import java.nio.charset.StandardCharsets
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_SECURE_SECRET_NODE
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider
class ZooKeeperEngineSecuritySecretProviderImpl extends EngineSecuritySecretProvider {
- import ZooKeeperClientProvider._
+ import DiscoveryClientProvider._
private var conf: KyuubiConf = _
@@ -34,8 +34,8 @@ class ZooKeeperEngineSecuritySecretProviderImpl extends EngineSecuritySecretProv
override def getSecret(): String = {
conf.get(HA_ZK_ENGINE_SECURE_SECRET_NODE).map { zkNode =>
- withZkClient[String](conf) { zkClient =>
- new String(zkClient.getData.forPath(zkNode), StandardCharsets.UTF_8)
+ withDiscoveryClient[String](conf) { discoveryClient =>
+ new String(discoveryClient.getData(zkNode), StandardCharsets.UTF_8)
}
}.getOrElse(
throw new IllegalArgumentException(s"${HA_ZK_ENGINE_SECURE_SECRET_NODE.key} is not defined"))
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala
new file mode 100644
index 00000000000..ca2a4ba8839
--- /dev/null
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientProviderSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.ha.client
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+
+class DiscoveryClientProviderSuite extends KyuubiFunSuite {
+ test("discovery") {
+ val conf = KyuubiConf()
+ DiscoveryClientProvider.withDiscoveryClient(conf) { discoveryClient =>
+ discoveryClient.getServerHost("/kyuubi")
+ }
+ DiscoveryClientProvider.withDiscoveryClient(conf) { discoveryClient =>
+ discoveryClient.getServerHost("/kyuubi")
+ }
+ }
+}
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProviderSuite.scala
similarity index 69%
rename from kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala
rename to kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProviderSuite.scala
index 97cbdf8a5b3..48fccfe98a1 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProviderSuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperClientProviderSuite.scala
@@ -15,13 +15,16 @@
* limitations under the License.
*/
-package org.apache.kyuubi.ha.client
+package org.apache.kyuubi.ha.client.zookeeper
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_CONN_BASE_RETRY_WAIT, HA_ZK_CONN_MAX_RETRIES, HA_ZK_CONN_MAX_RETRY_WAIT, HA_ZK_CONN_RETRY_POLICY}
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_BASE_RETRY_WAIT
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRIES
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_MAX_RETRY_WAIT
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY
-class ZooKeeperClientProviderSuite extends KyuubiFunSuite {
+class ZookeeperClientProviderSuite extends KyuubiFunSuite {
test("get graceful stop thread start delay") {
val conf = KyuubiConf()
@@ -29,23 +32,23 @@ class ZooKeeperClientProviderSuite extends KyuubiFunSuite {
val baseSleepTime = conf.get(HA_ZK_CONN_BASE_RETRY_WAIT)
val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
- val delay1 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+ val delay1 = ZookeeperClientProvider.getGracefulStopThreadDelay(conf)
assert(delay1 >= baseSleepTime * maxRetries)
conf.set(HA_ZK_CONN_RETRY_POLICY, "ONE_TIME")
- val delay2 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+ val delay2 = ZookeeperClientProvider.getGracefulStopThreadDelay(conf)
assert(delay2 === baseSleepTime)
conf.set(HA_ZK_CONN_RETRY_POLICY, "N_TIME")
- val delay3 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+ val delay3 = ZookeeperClientProvider.getGracefulStopThreadDelay(conf)
assert(delay3 === baseSleepTime * maxRetries)
conf.set(HA_ZK_CONN_RETRY_POLICY, "UNTIL_ELAPSED")
- val delay4 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+ val delay4 = ZookeeperClientProvider.getGracefulStopThreadDelay(conf)
assert(delay4 === maxSleepTime)
conf.set(HA_ZK_CONN_RETRY_POLICY, "BOUNDED_EXPONENTIAL_BACKOFF")
- val delay5 = ZooKeeperClientProvider.getGracefulStopThreadDelay(conf)
+ val delay5 = ZookeeperClientProvider.getGracefulStopThreadDelay(conf)
assert(delay5 >= baseSleepTime * maxRetries)
}
}
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
similarity index 83%
rename from kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
rename to kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
index 182bf56e413..30cf1aeabbe 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kyuubi.ha.client
+package org.apache.kyuubi.ha.client.zookeeper
import java.io.{File, IOException}
import java.net.InetAddress
@@ -33,11 +33,16 @@ import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.{KerberizedTestHelper, KYUUBI_VERSION}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf._
+import org.apache.kyuubi.ha.client.AuthTypes
+import org.apache.kyuubi.ha.client.DiscoveryClient
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider
+import org.apache.kyuubi.ha.client.EngineServiceDiscovery
+import org.apache.kyuubi.ha.client.KyuubiServiceDiscovery
import org.apache.kyuubi.service._
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
-class ServiceDiscoverySuite extends KerberizedTestHelper {
- import ZooKeeperClientProvider._
+class ZookeeperDiscoveryClientSuite extends KerberizedTestHelper {
+ import DiscoveryClientProvider._
val zkServer = new EmbeddedZookeeper()
val conf: KyuubiConf = KyuubiConf()
@@ -80,17 +85,17 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
server.initialize(conf)
server.start()
val znodeRoot = s"/$namespace"
- withZkClient(conf) { framework =>
+ withDiscoveryClient(conf) { framework =>
try {
- assert(framework.checkExists().forPath("/abc") === null)
- assert(framework.checkExists().forPath(znodeRoot) !== null)
- val children = framework.getChildren.forPath(znodeRoot).asScala
+ assert(framework.pathNonExists("/abc"))
+ assert(framework.pathExists(znodeRoot))
+ val children = framework.getChildren(znodeRoot)
assert(children.head ===
s"serviceUri=${server.frontendServices.head.connectionUrl};" +
s"version=$KYUUBI_VERSION;sequence=0000000000")
children.foreach { child =>
- framework.delete().forPath(s"""$znodeRoot/$child""")
+ framework.delete(s"""$znodeRoot/$child""")
}
eventually(timeout(5.seconds), interval(100.millis)) {
assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
@@ -112,21 +117,21 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
assert(actual === expected)
}
- val acl = new ZooKeeperACLProvider(conf).getDefaultAcl
+ val acl = new ZookeeperACLProvider(conf).getDefaultAcl
assertACL(expectedNoACL, acl)
- val serverConf = conf.clone.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.KERBEROS.toString)
- val serverACL = new ZooKeeperACLProvider(serverConf).getDefaultAcl
+ val serverConf = conf.clone.set(HA_ZK_AUTH_TYPE, AuthTypes.KERBEROS.toString)
+ val serverACL = new ZookeeperACLProvider(serverConf).getDefaultAcl
assertACL(expectedEnableACL, serverACL)
val engineConf = serverConf.clone.set(HA_ZK_ENGINE_REF_ID, "ref")
- engineConf.set(HA_ZK_ENGINE_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
- val engineACL = new ZooKeeperACLProvider(engineConf).getDefaultAcl
+ engineConf.set(HA_ZK_ENGINE_AUTH_TYPE, AuthTypes.NONE.toString)
+ val engineACL = new ZookeeperACLProvider(engineConf).getDefaultAcl
assertACL(expectedNoACL, engineACL)
val enableEngineACLConf = serverConf.clone.set(HA_ZK_ENGINE_REF_ID, "ref")
- enableEngineACLConf.set(HA_ZK_ENGINE_AUTH_TYPE, ZooKeeperAuthTypes.KERBEROS.toString)
- val enableEngineACL = new ZooKeeperACLProvider(enableEngineACLConf).getDefaultAcl
+ enableEngineACLConf.set(HA_ZK_ENGINE_AUTH_TYPE, AuthTypes.KERBEROS.toString)
+ val enableEngineACL = new ZookeeperACLProvider(enableEngineACLConf).getDefaultAcl
assertACL(expectedEnableACL, enableEngineACL)
}
@@ -137,9 +142,9 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
conf.set(HA_ZK_AUTH_KEYTAB.key, keytab.getCanonicalPath)
conf.set(HA_ZK_AUTH_PRINCIPAL.key, principal)
- conf.set(HA_ZK_AUTH_TYPE.key, ZooKeeperAuthTypes.KERBEROS.toString)
+ conf.set(HA_ZK_AUTH_TYPE.key, AuthTypes.KERBEROS.toString)
- ZooKeeperClientProvider.setUpZooKeeperAuth(conf)
+ ZookeeperClientProvider.setUpZooKeeperAuth(conf)
val configuration = Configuration.getConfiguration
val entries = configuration.getAppConfigurationEntry("KyuubiZooKeeperClient")
@@ -151,7 +156,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
assert(options("useKeyTab").toString.toBoolean)
conf.set(HA_ZK_AUTH_KEYTAB.key, s"${keytab.getName}")
- val e = intercept[IOException](ZooKeeperClientProvider.setUpZooKeeperAuth(conf))
+ val e = intercept[IOException](ZookeeperClientProvider.setUpZooKeeperAuth(conf))
assert(e.getMessage === s"${HA_ZK_AUTH_KEYTAB.key} does not exists")
}
}
@@ -167,7 +172,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
.set(HA_ZK_QUORUM, zkServer.getConnectString)
.set(HA_ZK_NAMESPACE, namespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
- .set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
+ .set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString)
var serviceDiscovery: KyuubiServiceDiscovery = null
val server: Serverable = new NoopTBinaryFrontendServer() {
@@ -183,18 +188,18 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
server.start()
val znodeRoot = s"/$namespace"
- withZkClient(conf) { framework =>
+ withDiscoveryClient(conf) { framework =>
try {
- assert(framework.checkExists().forPath("/abc") === null)
- assert(framework.checkExists().forPath(znodeRoot) !== null)
- val children = framework.getChildren.forPath(znodeRoot).asScala
+ assert(framework.pathNonExists("/abc"))
+ assert(framework.pathExists(znodeRoot))
+ val children = framework.getChildren(znodeRoot)
assert(children.head ===
s"serviceUri=${server.frontendServices.head.connectionUrl};" +
s"version=$KYUUBI_VERSION;sequence=0000000000")
children.foreach { child =>
- framework.delete().forPath(s"""$znodeRoot/$child""")
+ framework.delete(s"""$znodeRoot/$child""")
}
eventually(timeout(5.seconds), interval(100.millis)) {
assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
@@ -216,7 +221,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
val host = "127.0.0.1"
val port = 10009
val instance1 = s"$host:$port"
- val (host1, port1) = ServiceDiscovery.parseInstanceHostPort(instance1)
+ val (host1, port1) = DiscoveryClient.parseInstanceHostPort(instance1)
assert(host === host1)
assert(port === port1)
@@ -224,7 +229,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
s"hive.server2.transport.mode=binary;hive.server2.authentication=KERBEROS;" +
s"hive.server2.thrift.port=$port;" +
s"hive.server2.authentication.kerberos.principal=test/_HOST@apache.org"
- val (host2, port2) = ServiceDiscovery.parseInstanceHostPort(instance2)
+ val (host2, port2) = DiscoveryClient.parseInstanceHostPort(instance2)
assert(host === host2)
assert(port === port2)
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index e0b0a1c6c9b..894acfdbbf1 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -24,9 +24,6 @@ import scala.util.Random
import com.codahale.metrics.MetricRegistry
import com.google.common.annotations.VisibleForTesting
-import org.apache.curator.framework.CuratorFramework
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
-import org.apache.curator.utils.ZKPaths
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, Utils}
@@ -41,8 +38,8 @@ import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.engine.trino.TrinoProcessBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
-import org.apache.kyuubi.ha.client.ServiceDiscovery.getEngineByRefId
-import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost
+import org.apache.kyuubi.ha.client.DiscoveryClient
+import org.apache.kyuubi.ha.client.DiscoveryPaths
import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL}
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.log.OperationLog
@@ -135,8 +132,8 @@ private[kyuubi] class EngineRef(
private[kyuubi] lazy val engineSpace: String = {
val commonParent = s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType"
shareLevel match {
- case CONNECTION => ZKPaths.makePath(commonParent, appUser, engineRefId)
- case _ => ZKPaths.makePath(commonParent, appUser, subdomain)
+ case CONNECTION => DiscoveryPaths.makePath(commonParent, appUser, Array(engineRefId))
+ case _ => DiscoveryPaths.makePath(commonParent, appUser, Array(subdomain))
}
}
@@ -144,38 +141,23 @@ private[kyuubi] class EngineRef(
* The distributed lock path used to ensure only once engine being created for non-CONNECTION
* share level.
*/
- private def tryWithLock[T](zkClient: CuratorFramework)(f: => T): T = shareLevel match {
- case CONNECTION => f
- case _ =>
- val lockPath =
- ZKPaths.makePath(s"${serverSpace}_$shareLevel", "lock", appUser, subdomain)
- var lock: InterProcessSemaphoreMutex = null
- try {
- try {
- lock = new InterProcessSemaphoreMutex(zkClient, lockPath)
- // Acquire a lease. If no leases are available, this method blocks until either the
- // maximum number of leases is increased or another client/process closes a lease
- lock.acquire(timeout, TimeUnit.MILLISECONDS)
- } catch {
- case e: Exception => throw KyuubiSQLException(s"Lock failed on path [$lockPath]", e)
- }
- f
- } finally {
- try {
- if (lock != null) {
- lock.release()
- }
- } catch {
- case _: Exception =>
- }
- }
- }
+ private def tryWithLock[T](discoveryClient: DiscoveryClient)(f: => T): T =
+ shareLevel match {
+ case CONNECTION => f
+ case _ =>
+ val lockPath =
+ DiscoveryPaths.makePath(
+ s"${serverSpace}_$shareLevel",
+ "lock",
+ Array(appUser, subdomain))
+ discoveryClient.tryWithLock(lockPath, timeout, TimeUnit.MILLISECONDS)(f)
+ }
private def create(
- zkClient: CuratorFramework,
- extraEngineLog: Option[OperationLog]): (String, Int) = tryWithLock(zkClient) {
+ discoveryClient: DiscoveryClient,
+ extraEngineLog: Option[OperationLog]): (String, Int) = tryWithLock(discoveryClient) {
// Get the engine address ahead if another process has succeeded
- var engineRef = getServerHost(zkClient, engineSpace)
+ var engineRef = discoveryClient.getServerHost(engineSpace)
if (engineRef.nonEmpty) return engineRef.get
conf.set(HA_ZK_NAMESPACE, engineSpace)
@@ -228,7 +210,7 @@ private[kyuubi] class EngineRef(
s"Timeout($timeout ms) to launched $engineType engine with $builder. $killMessage",
builder.getError)
}
- engineRef = getEngineByRefId(zkClient, engineSpace, engineRefId)
+ engineRef = discoveryClient.getEngineByRefId(engineSpace, engineRefId)
}
engineRef.get
} finally {
@@ -241,15 +223,15 @@ private[kyuubi] class EngineRef(
/**
* Get the engine ref from engine space first or create a new one
*
- * @param zkClient the zookeeper client to get or create engine instance
+ * @param discoveryClient the zookeeper client to get or create engine instance
* @param extraEngineLog the launch engine operation log, used to inject engine log into it
*/
def getOrCreate(
- zkClient: CuratorFramework,
+ discoveryClient: DiscoveryClient,
extraEngineLog: Option[OperationLog] = None): (String, Int) = {
- getServerHost(zkClient, engineSpace)
+ discoveryClient.getServerHost(engineSpace)
.getOrElse {
- create(zkClient, extraEngineLog)
+ create(discoveryClient, extraEngineLog)
}
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index fbb10d64ef4..7e44da4bd56 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -33,7 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAIN_RESOURCE
import org.apache.kyuubi.engine.ProcBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf
-import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
+import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.KyuubiHadoopUtils
@@ -116,8 +116,8 @@ class SparkProcessBuilder(
var allConf = conf.getAll
// if enable sasl kerberos authentication for zookeeper, need to upload the server ketab file
- if (ZooKeeperAuthTypes.withName(conf.get(HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE))
- == ZooKeeperAuthTypes.KERBEROS) {
+ if (AuthTypes.withName(conf.get(HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE))
+ == AuthTypes.KERBEROS) {
allConf = allConf ++ zkAuthKeytabFileConf(allConf)
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index 9c461cdf705..341e9d0fd9c 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -22,9 +22,7 @@ import java.util
import scala.util.Properties
-import org.apache.curator.utils.ZKPaths
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.zookeeper.CreateMode.PERSISTENT
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.NodeExistsException
@@ -35,8 +33,10 @@ import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._
import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent, KyuubiServerInfoEvent}
import org.apache.kyuubi.events.handler.ServerJsonLoggingEventHandler
import org.apache.kyuubi.ha.HighAvailabilityConf._
-import org.apache.kyuubi.ha.client.{ServiceDiscovery, ZooKeeperAuthTypes}
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
+import org.apache.kyuubi.ha.client.AuthTypes
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider._
+import org.apache.kyuubi.ha.client.DiscoveryPaths
+import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem}
import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState}
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
@@ -51,7 +51,7 @@ object KyuubiServer extends Logging {
zkServer.initialize(conf)
zkServer.start()
conf.set(HA_ZK_QUORUM, zkServer.getConnectString)
- conf.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
+ conf.set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString)
} else {
// create chroot path if necessary
val connectionStr = conf.get(HA_ZK_QUORUM)
@@ -71,15 +71,11 @@ object KyuubiServer extends Logging {
chrootOption.foreach { chroot =>
val zkConnectionForChrootCreation = connectionStr.substring(0, chrootIndex)
val overrideQuorumConf = conf.clone.set(HA_ZK_QUORUM, zkConnectionForChrootCreation)
- withZkClient(overrideQuorumConf) { zkClient =>
- if (zkClient.checkExists().forPath(chroot) == null) {
- val chrootPath = ZKPaths.makePath(null, chroot)
+ withDiscoveryClient(overrideQuorumConf) { discoveryClient =>
+ if (discoveryClient.pathNonExists(chroot)) {
+ val chrootPath = DiscoveryPaths.makePath(null, chroot)
try {
- zkClient
- .create()
- .creatingParentsIfNeeded()
- .withMode(PERSISTENT)
- .forPath(chrootPath)
+ discoveryClient.create(chrootPath, "PERSISTENT")
} catch {
case _: NodeExistsException => // do nothing
case e: KeeperException =>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 1c4c09eb477..cec83ac1b18 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -28,7 +28,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.EngineRef
import org.apache.kyuubi.events.{EventBus, KyuubiEvent, KyuubiSessionEvent}
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider._
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationState}
@@ -93,8 +93,8 @@ class KyuubiSessionImpl(
}
private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit = {
- withZkClient(sessionConf) { zkClient =>
- val (host, port) = engine.getOrCreate(zkClient, extraEngineLog)
+ withDiscoveryClient(sessionConf) { discoveryClient =>
+ val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
val passwd =
if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
EngineSecurityAccessor.get().issueToken()
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
index 38171cefabd..075c79faa66 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
@@ -21,7 +21,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_AUTH_TYPE, HA_ZK_QUORUM}
-import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
+import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.server.KyuubiServer
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
@@ -48,7 +48,7 @@ trait WithKyuubiServer extends KyuubiFunSuite {
zkServer.initialize(conf)
zkServer.start()
conf.set(HA_ZK_QUORUM, zkServer.getConnectString)
- conf.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
+ conf.set(HA_ZK_AUTH_TYPE, AuthTypes.NONE.toString)
conf.set("spark.ui.enabled", "false")
conf.setIfMissing("spark.sql.catalogImplementation", "in-memory")
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
index 861dd0f02aa..8b82db99018 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.engine
import java.util.UUID
-import org.apache.curator.utils.ZKPaths
import org.apache.hadoop.security.UserGroupInformation
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
@@ -27,7 +26,8 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.ha.HighAvailabilityConf
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider
+import org.apache.kyuubi.ha.client.DiscoveryPaths
import org.apache.kyuubi.util.NamedThreadFactory
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
@@ -67,7 +67,10 @@ class EngineRefSuite extends KyuubiFunSuite {
domain.foreach(conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, _))
val engine = new EngineRef(conf, user, id)
assert(engine.engineSpace ===
- ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${CONNECTION}_${engineType}", user, id))
+ DiscoveryPaths.makePath(
+ s"kyuubi_${KYUUBI_VERSION}_${CONNECTION}_${engineType}",
+ user,
+ Array(id)))
assert(engine.defaultEngineName === s"kyuubi_${CONNECTION}_${engineType}_${user}_$id")
}
}
@@ -78,7 +81,10 @@ class EngineRefSuite extends KyuubiFunSuite {
conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString)
val appName = new EngineRef(conf, user, id)
assert(appName.engineSpace ===
- ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${USER}_$FLINK_SQL", user, "default"))
+ DiscoveryPaths.makePath(
+ s"kyuubi_${KYUUBI_VERSION}_${USER}_$FLINK_SQL",
+ user,
+ Array("default")))
assert(appName.defaultEngineName === s"kyuubi_${USER}_${FLINK_SQL}_${user}_default_$id")
Seq(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN, KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN).foreach {
@@ -87,7 +93,10 @@ class EngineRefSuite extends KyuubiFunSuite {
conf.set(k.key, "abc")
val appName2 = new EngineRef(conf, user, id)
assert(appName2.engineSpace ===
- ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${USER}_${FLINK_SQL}", user, "abc"))
+ DiscoveryPaths.makePath(
+ s"kyuubi_${KYUUBI_VERSION}_${USER}_${FLINK_SQL}",
+ user,
+ Array("abc")))
assert(appName2.defaultEngineName === s"kyuubi_${USER}_${FLINK_SQL}_${user}_abc_$id")
}
}
@@ -99,7 +108,10 @@ class EngineRefSuite extends KyuubiFunSuite {
val engineRef = new EngineRef(conf, user, id)
val primaryGroupName = UserGroupInformation.createRemoteUser(user).getPrimaryGroupName
assert(engineRef.engineSpace ===
- ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL", primaryGroupName, "default"))
+ DiscoveryPaths.makePath(
+ s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL",
+ primaryGroupName,
+ Array("default")))
assert(engineRef.defaultEngineName ===
s"kyuubi_GROUP_SPARK_SQL_${primaryGroupName}_default_$id")
@@ -109,10 +121,10 @@ class EngineRefSuite extends KyuubiFunSuite {
conf.set(k.key, "abc")
val engineRef2 = new EngineRef(conf, user, id)
assert(engineRef2.engineSpace ===
- ZKPaths.makePath(
+ DiscoveryPaths.makePath(
s"kyuubi_${KYUUBI_VERSION}_${GROUP}_${SPARK_SQL}",
primaryGroupName,
- "abc"))
+ Array("abc")))
assert(engineRef2.defaultEngineName ===
s"kyuubi_${GROUP}_${SPARK_SQL}_${primaryGroupName}_abc_$id")
}
@@ -122,7 +134,10 @@ class EngineRefSuite extends KyuubiFunSuite {
assert(newUGI.getGroupNames.isEmpty)
val engineRef3 = new EngineRef(conf, userName, id)
assert(engineRef3.engineSpace ===
- ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL", userName, "abc"))
+ DiscoveryPaths.makePath(
+ s"kyuubi_${KYUUBI_VERSION}_GROUP_SPARK_SQL",
+ userName,
+ Array("abc")))
assert(engineRef3.defaultEngineName === s"kyuubi_GROUP_SPARK_SQL_${userName}_abc_$id")
}
@@ -132,13 +147,19 @@ class EngineRefSuite extends KyuubiFunSuite {
conf.set(KyuubiConf.ENGINE_TYPE, FLINK_SQL.toString)
val appName = new EngineRef(conf, user, id)
assert(appName.engineSpace ===
- ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}", user, "default"))
+ DiscoveryPaths.makePath(
+ s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}",
+ user,
+ Array("default")))
assert(appName.defaultEngineName === s"kyuubi_${SERVER}_${FLINK_SQL}_${user}_default_$id")
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "abc")
val appName2 = new EngineRef(conf, user, id)
assert(appName2.engineSpace ===
- ZKPaths.makePath(s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}", user, "abc"))
+ DiscoveryPaths.makePath(
+ s"kyuubi_${KYUUBI_VERSION}_${SERVER}_${FLINK_SQL}",
+ user,
+ Array("abc")))
assert(appName2.defaultEngineName === s"kyuubi_${SERVER}_${FLINK_SQL}_${user}_abc_$id")
}
@@ -200,7 +221,7 @@ class EngineRefSuite extends KyuubiFunSuite {
val r1 = new Runnable {
override def run(): Unit = {
- ZooKeeperClientProvider.withZkClient(conf) { client =>
+ DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
val hp = engine.getOrCreate(client)
port1 = hp._2
}
@@ -209,7 +230,7 @@ class EngineRefSuite extends KyuubiFunSuite {
val r2 = new Runnable {
override def run(): Unit = {
- ZooKeeperClientProvider.withZkClient(conf) { client =>
+ DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
val hp = engine.getOrCreate(client)
port2 = hp._2
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index 45ff4b3bd78..46525436c30 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -30,7 +30,7 @@ import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_LOG_TIMEOUT, ENGINE_SPARK_MAIN_RESOURCE}
import org.apache.kyuubi.ha.HighAvailabilityConf
-import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
+import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.service.ServiceUtils
class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
@@ -281,7 +281,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
test("zookeeper kerberos authentication") {
val conf = KyuubiConf()
- conf.set(HighAvailabilityConf.HA_ZK_AUTH_TYPE.key, ZooKeeperAuthTypes.KERBEROS.toString)
+ conf.set(HighAvailabilityConf.HA_ZK_AUTH_TYPE.key, AuthTypes.KERBEROS.toString)
conf.set(HighAvailabilityConf.HA_ZK_AUTH_KEYTAB.key, testKeytab)
conf.set(HighAvailabilityConf.HA_ZK_AUTH_PRINCIPAL.key, testPrincipal)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala
index ae5d611e09a..5face684f63 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala
@@ -17,19 +17,14 @@
package org.apache.kyuubi.operation
-import java.nio.charset.StandardCharsets
-
-import org.apache.curator.framework.recipes.nodes.PersistentNode
-import org.apache.zookeeper.CreateMode
-
import org.apache.kyuubi.WithKyuubiServer
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider
import org.apache.kyuubi.service.authentication.{EngineSecurityAccessor, ZooKeeperEngineSecuritySecretProviderImpl}
class KyuubiOperationWithEngineSecurity extends WithKyuubiServer with HiveJDBCTestHelper {
- import ZooKeeperClientProvider._
+ import DiscoveryClientProvider._
override protected def jdbcUrl: String = getJdbcUrl
@@ -46,15 +41,9 @@ class KyuubiOperationWithEngineSecurity extends WithKyuubiServer with HiveJDBCTe
override def beforeAll(): Unit = {
super.beforeAll()
- withZkClient(conf) { zkClient =>
- zkClient.create().withMode(CreateMode.PERSISTENT).forPath(engineSecretNode)
- val secretNode = new PersistentNode(
- zkClient,
- CreateMode.PERSISTENT,
- false,
- engineSecretNode,
- "_ENGINE_SECRET_".getBytes(StandardCharsets.UTF_8))
- secretNode.start()
+ withDiscoveryClient(conf) { discoveryClient =>
+ discoveryClient.create(engineSecretNode, "PERSISTENT", false)
+ discoveryClient.startSecretNode("PERSISTENT", engineSecretNode, "_ENGINE_SECRET_")
}
conf.set(KyuubiConf.ENGINE_SECURITY_ENABLED, true)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala
index 62284fd9d19..fba26709d4c 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.server
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf
-import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
+import org.apache.kyuubi.ha.client.DiscoveryClientProvider
import org.apache.kyuubi.service.ServiceState._
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
@@ -116,16 +116,16 @@ class KyuubiServerSuite extends KyuubiFunSuite {
val chrootPath = "/lake"
conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkConnection)
// chroot path does not exist before server start
- ZooKeeperClientProvider.withZkClient(conf) { client =>
- assert(client.checkExists().forPath(chrootPath) == null)
+ DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
+ assert(client.pathNonExists(chrootPath))
}
val zkWithChroot = zkConnection + chrootPath
val chrootConf = conf.clone.set(HighAvailabilityConf.HA_ZK_QUORUM, zkWithChroot)
server = KyuubiServer.startServer(chrootConf)
// chroot path exists after server started
- ZooKeeperClientProvider.withZkClient(conf) { client =>
- assert(client.checkExists().forPath(chrootPath) != null)
+ DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
+ assert(client.pathExists(chrootPath))
}
}
}