Skip to content

Commit

Permalink
[KYUUBI apache#1989] Decouple curator from other modules
Browse files Browse the repository at this point in the history
  • Loading branch information
hddong committed Mar 4, 2022
1 parent faa2dd4 commit 278a0f9
Show file tree
Hide file tree
Showing 25 changed files with 471 additions and 306 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@ abstract class ShareLevelSparkEngineSuite
}

test("check discovery service is clean up with different share level") {
withZkClient { zkClient =>
withDiscoveryClient { discoveryClient =>
assert(engine.getServiceState == ServiceState.STARTED)
assert(zkClient.checkExists().forPath(namespace) != null)
assert(discoveryClient.pathExists(namespace))
withJdbcStatement() { _ => }
shareLevel match {
// Connection level, we will cleanup namespace since it's always a global unique value.
case ShareLevel.CONNECTION =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(zkClient.checkExists().forPath(namespace) == null)
assert(discoveryClient.pathNonExists(namespace))
case _ =>
assert(engine.getServiceState == ServiceState.STARTED)
assert(zkClient.checkExists().forPath(namespace) != null)
assert(discoveryClient.pathExists(namespace))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.kyuubi.engine.spark

import org.apache.curator.framework.CuratorFramework

import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_AUTH_TYPE, HA_ZK_NAMESPACE, HA_ZK_QUORUM}
import org.apache.kyuubi.ha.client.{ZooKeeperAuthTypes, ZooKeeperClientProvider}
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.ha.client.DiscoveryClientProvider
import org.apache.kyuubi.ha.client.zookeeper.DiscoveryClient
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}

trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
Expand All @@ -32,7 +32,7 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
assert(zkServer != null)
Map(
HA_ZK_QUORUM.key -> zkServer.getConnectString,
HA_ZK_AUTH_TYPE.key -> ZooKeeperAuthTypes.NONE.toString,
HA_ZK_AUTH_TYPE.key -> AuthTypes.NONE.toString,
HA_ZK_NAMESPACE.key -> namespace)
}

Expand Down Expand Up @@ -62,8 +62,8 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
stopSparkEngine()
}

def withZkClient(f: CuratorFramework => Unit): Unit = {
ZooKeeperClientProvider.withZkClient(kyuubiConf)(f)
def withDiscoveryClient(f: DiscoveryClient => Unit): Unit = {
DiscoveryClientProvider.withDiscoveryClient(kyuubiConf)(f)
}

protected def getDiscoveryConnectionString: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ package org.apache.kyuubi.ctl

import scala.collection.mutable.ListBuffer

import org.apache.curator.framework.CuratorFramework
import org.apache.curator.utils.ZKPaths

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN
import org.apache.kyuubi.config.KyuubiConf.ENGINE_TYPE
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider}
import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, ServiceNodeInfo}
import org.apache.kyuubi.ha.client.zookeeper.DiscoveryClient

private[ctl] object ServiceControlAction extends Enumeration {
type ServiceControlAction = Value
Expand All @@ -43,9 +41,8 @@ private[ctl] object ServiceControlObject extends Enumeration {
* Main gateway of launching a Kyuubi Ctl action.
*/
private[kyuubi] class ServiceControlCli extends Logging {
import DiscoveryClientProvider._
import ServiceControlCli._
import ServiceDiscovery._
import ZooKeeperClientProvider._

private var verbose: Boolean = false

Expand Down Expand Up @@ -84,21 +81,20 @@ private[kyuubi] class ServiceControlCli extends Logging {
val kyuubiConf = args.conf

kyuubiConf.setIfMissing(HA_ZK_QUORUM, args.cliArgs.zkQuorum)
withZkClient(kyuubiConf) { zkClient =>
val fromNamespace = ZKPaths.makePath(null, kyuubiConf.get(HA_ZK_NAMESPACE))
withDiscoveryClient(kyuubiConf) { discoveryClient =>
val fromNamespace = DiscoveryClient.makePath(null, kyuubiConf.get(HA_ZK_NAMESPACE))
val toNamespace = getZkNamespace(args)

val currentServerNodes = getServiceNodesInfo(zkClient, fromNamespace)
val currentServerNodes = discoveryClient.getServiceNodesInfo(fromNamespace)
val exposedServiceNodes = ListBuffer[ServiceNodeInfo]()

if (currentServerNodes.nonEmpty) {
def doCreate(zc: CuratorFramework): Unit = {
def doCreate(zc: DiscoveryClient): Unit = {
currentServerNodes.foreach { sn =>
info(s"Exposing server instance:${sn.instance} with version:${sn.version}" +
s" from $fromNamespace to $toNamespace")
val newNodePath = createAndGetServiceNode(
val newNodePath = zc.createAndGetServiceNode(
kyuubiConf,
zc,
args.cliArgs.namespace,
sn.instance,
sn.version,
Expand All @@ -110,10 +106,10 @@ private[kyuubi] class ServiceControlCli extends Logging {
}

if (kyuubiConf.get(HA_ZK_QUORUM) == args.cliArgs.zkQuorum) {
doCreate(zkClient)
doCreate(discoveryClient)
} else {
kyuubiConf.set(HA_ZK_QUORUM, args.cliArgs.zkQuorum)
withZkClient(kyuubiConf)(doCreate)
withDiscoveryClient(kyuubiConf)(doCreate)
}
}

Expand All @@ -126,24 +122,24 @@ private[kyuubi] class ServiceControlCli extends Logging {
* List Kyuubi server nodes info.
*/
private def list(args: ServiceControlCliArguments, filterHostPort: Boolean): Unit = {
withZkClient(args.conf) { zkClient =>
withDiscoveryClient(args.conf) { discoveryClient =>
val znodeRoot = getZkNamespace(args)
val hostPortOpt =
if (filterHostPort) {
Some((args.cliArgs.host, args.cliArgs.port.toInt))
} else None
val nodes = getServiceNodes(zkClient, znodeRoot, hostPortOpt)
val nodes = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)

val title = "Zookeeper service nodes"
info(renderServiceNodesInfo(title, nodes, verbose))
}
}

private def getServiceNodes(
zkClient: CuratorFramework,
discoveryClient: DiscoveryClient,
znodeRoot: String,
hostPortOpt: Option[(String, Int)]): Seq[ServiceNodeInfo] = {
val serviceNodes = getServiceNodesInfo(zkClient, znodeRoot)
val serviceNodes = discoveryClient.getServiceNodesInfo(znodeRoot)
hostPortOpt match {
case Some((host, port)) => serviceNodes.filter { sn =>
sn.host == host && sn.port == port
Expand All @@ -156,17 +152,17 @@ private[kyuubi] class ServiceControlCli extends Logging {
* Delete zookeeper service node with specified host port.
*/
private def delete(args: ServiceControlCliArguments): Unit = {
withZkClient(args.conf) { zkClient =>
withDiscoveryClient(args.conf) { discoveryClient =>
val znodeRoot = getZkNamespace(args)
val hostPortOpt = Some((args.cliArgs.host, args.cliArgs.port.toInt))
val nodesToDelete = getServiceNodes(zkClient, znodeRoot, hostPortOpt)
val nodesToDelete = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)

val deletedNodes = ListBuffer[ServiceNodeInfo]()
nodesToDelete.foreach { node =>
val nodePath = s"$znodeRoot/${node.nodeName}"
info(s"Deleting zookeeper service node:$nodePath")
try {
zkClient.delete().forPath(nodePath)
discoveryClient.delete(nodePath)
deletedNodes += node
} catch {
case e: Exception =>
Expand Down Expand Up @@ -227,18 +223,18 @@ object ServiceControlCli extends CommandLineUtils with Logging {
private[ctl] def getZkNamespace(args: ServiceControlCliArguments): String = {
args.cliArgs.service match {
case ServiceControlObject.SERVER =>
ZKPaths.makePath(null, args.cliArgs.namespace)
DiscoveryClient.makePath(null, args.cliArgs.namespace)
case ServiceControlObject.ENGINE =>
val engineType = Some(args.cliArgs.engineType)
.filter(_ != null).filter(_.nonEmpty)
.getOrElse(args.conf.get(ENGINE_TYPE))
val engineSubdomain = Some(args.cliArgs.engineSubdomain)
.filter(_ != null).filter(_.nonEmpty)
.getOrElse(args.conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).getOrElse("default"))
ZKPaths.makePath(
DiscoveryClient.makePath(
s"${args.cliArgs.namespace}_${ShareLevel.USER}_${engineType}",
args.cliArgs.user,
engineSubdomain)
Array(engineSubdomain))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ package org.apache.kyuubi.ctl
import java.io.{OutputStream, PrintStream}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_NAMESPACE, HA_ZK_QUORUM}
import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider}
import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, ServiceNodeInfo}
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}

trait TestPrematureExit {
Expand Down Expand Up @@ -86,9 +85,8 @@ trait TestPrematureExit {
}

class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
import DiscoveryClientProvider._
import ServiceControlCli._
import ServiceDiscovery._
import ZooKeeperClientProvider._

val zkServer = new EmbeddedZookeeper()
val conf: KyuubiConf = KyuubiConf()
Expand Down Expand Up @@ -226,9 +224,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
System.setProperty(HA_ZK_NAMESPACE.key, uniqueNamespace)

withZkClient(conf) { framework =>
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
withDiscoveryClient(conf) { framework =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")

val newNamespace = getUniqueNamespace()
val args = Array(
Expand All @@ -245,15 +243,15 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {

testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false))
val znodeRoot = s"/$newNamespace"
val children = framework.getChildren.forPath(znodeRoot).asScala.sorted
val children = framework.getChildren(znodeRoot).sorted
assert(children.size == 2)

assert(children.head.startsWith(
s"serviceUri=localhost:10000;version=$KYUUBI_VERSION;sequence="))
assert(children.last.startsWith(
s"serviceUri=localhost:10001;version=$KYUUBI_VERSION;sequence="))
children.foreach { child =>
framework.delete().forPath(s"""$znodeRoot/$child""")
framework.delete(s"""$znodeRoot/$child""")
}
}
}
Expand Down Expand Up @@ -290,9 +288,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)

withZkClient(conf) { framework =>
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
withDiscoveryClient(conf) { framework =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")

val args = Array(
"list",
Expand All @@ -319,9 +317,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)

withZkClient(conf) { framework =>
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
withDiscoveryClient(conf) { framework =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")

val args = Array(
"get",
Expand Down Expand Up @@ -351,10 +349,10 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)

withZkClient(conf) { framework =>
withZkClient(conf) { zc =>
createAndGetServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true)
createAndGetServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true)
withDiscoveryClient(conf) { framework =>
withDiscoveryClient(conf) { zc =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000", external = true)
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001", external = true)
}

val args = Array(
Expand Down Expand Up @@ -385,9 +383,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)

withZkClient(conf) { framework =>
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
withDiscoveryClient(conf) { framework =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")

val args = Array(
"list",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.time.Duration
import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf, OptionalConfigEntry}
import org.apache.kyuubi.ha.client.{RetryPolicies, ZooKeeperAuthTypes}
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.ha.client.RetryPolicies

object HighAvailabilityConf {

Expand Down Expand Up @@ -51,16 +52,16 @@ object HighAvailabilityConf {
val HA_ZK_AUTH_TYPE: ConfigEntry[String] =
buildConf("ha.zookeeper.auth.type")
.doc("The type of zookeeper authentication, all candidates are " +
s"${ZooKeeperAuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
s"${AuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
.version("1.3.2")
.stringConf
.checkValues(ZooKeeperAuthTypes.values.map(_.toString))
.createWithDefault(ZooKeeperAuthTypes.NONE.toString)
.checkValues(AuthTypes.values.map(_.toString))
.createWithDefault(AuthTypes.NONE.toString)

val HA_ZK_ENGINE_AUTH_TYPE: ConfigEntry[String] =
buildConf("ha.zookeeper.engine.auth.type")
.doc("The type of zookeeper authentication for engine, all candidates are " +
s"${ZooKeeperAuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
s"${AuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
.version("1.3.2")
.fallbackConf(HA_ZK_AUTH_TYPE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.kyuubi.ha.client

object ZooKeeperAuthTypes extends Enumeration {
object AuthTypes extends Enumeration {

type ZooKeeperAuthType = Value
type AuthTypes = Value

val NONE, KERBEROS, DIGEST = Value

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.ha.client

import java.io.IOException

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.client.zookeeper.DiscoveryClient

object DiscoveryClientProvider extends Logging {

/**
* Creates a zookeeper client before calling `f` and close it after calling `f`.
*/
def withDiscoveryClient[T](conf: KyuubiConf)(f: DiscoveryClient => T): T = {
val discoveryClient = new DiscoveryClient(conf)
try {
discoveryClient.createClient()
f(discoveryClient)
} finally {
try {
discoveryClient.closeClient()
} catch {
case e: IOException => error("Failed to release the zkClient", e)
}
}
}

}
Loading

0 comments on commit 278a0f9

Please sign in to comment.