Skip to content

Commit

Permalink
[KYUUBI apache#1989] Decouple curator from other modules
Browse files Browse the repository at this point in the history
  • Loading branch information
hddong committed Apr 12, 2022
1 parent 04c536b commit 2233acc
Show file tree
Hide file tree
Showing 28 changed files with 902 additions and 567 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,18 @@ abstract class ShareLevelSparkEngineSuite
}

test("check discovery service is clean up with different share level") {
withZkClient { zkClient =>
withDiscoveryClient { discoveryClient =>
assert(engine.getServiceState == ServiceState.STARTED)
assert(zkClient.checkExists().forPath(namespace) != null)
assert(discoveryClient.pathExists(namespace))
withJdbcStatement() { _ => }
shareLevel match {
// Connection level, we will cleanup namespace since it's always a global unique value.
case ShareLevel.CONNECTION =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(zkClient.checkExists().forPath(namespace) == null)
assert(discoveryClient.pathNonExists(namespace))
case _ =>
assert(engine.getServiceState == ServiceState.STARTED)
assert(zkClient.checkExists().forPath(namespace) != null)
assert(discoveryClient.pathExists(namespace))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.kyuubi.engine.spark

import org.apache.curator.framework.CuratorFramework

import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_AUTH_TYPE, HA_ZK_NAMESPACE, HA_ZK_QUORUM}
import org.apache.kyuubi.ha.client.{ZooKeeperAuthTypes, ZooKeeperClientProvider}
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.ha.client.DiscoveryClient
import org.apache.kyuubi.ha.client.DiscoveryClientProvider
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}

trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
Expand All @@ -32,7 +32,7 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
assert(zkServer != null)
Map(
HA_ZK_QUORUM.key -> zkServer.getConnectString,
HA_ZK_AUTH_TYPE.key -> ZooKeeperAuthTypes.NONE.toString,
HA_ZK_AUTH_TYPE.key -> AuthTypes.NONE.toString,
HA_ZK_NAMESPACE.key -> namespace)
}

Expand Down Expand Up @@ -62,8 +62,8 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
stopSparkEngine()
}

def withZkClient(f: CuratorFramework => Unit): Unit = {
ZooKeeperClientProvider.withZkClient(kyuubiConf)(f)
def withDiscoveryClient(f: DiscoveryClient => Unit): Unit = {
DiscoveryClientProvider.withDiscoveryClient(kyuubiConf)(f)
}

protected def getDiscoveryConnectionString: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package org.apache.kyuubi.ctl

import scala.collection.mutable.ListBuffer

import org.apache.curator.framework.CuratorFramework
import org.apache.curator.utils.ZKPaths

import org.apache.kyuubi.{KYUUBI_VERSION, Logging}
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN
import org.apache.kyuubi.config.KyuubiConf.ENGINE_TYPE
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider}
import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, ServiceNodeInfo}
import org.apache.kyuubi.ha.client.DiscoveryClient
import org.apache.kyuubi.ha.client.DiscoveryPaths

private[ctl] object ServiceControlAction extends Enumeration {
type ServiceControlAction = Value
Expand All @@ -43,9 +42,8 @@ private[ctl] object ServiceControlObject extends Enumeration {
* Main gateway of launching a Kyuubi Ctl action.
*/
private[kyuubi] class ServiceControlCli extends Logging {
import DiscoveryClientProvider._
import ServiceControlCli._
import ServiceDiscovery._
import ZooKeeperClientProvider._

private var verbose: Boolean = false

Expand Down Expand Up @@ -84,21 +82,20 @@ private[kyuubi] class ServiceControlCli extends Logging {
val kyuubiConf = args.conf

kyuubiConf.setIfMissing(HA_ZK_QUORUM, args.cliArgs.zkQuorum)
withZkClient(kyuubiConf) { zkClient =>
val fromNamespace = ZKPaths.makePath(null, kyuubiConf.get(HA_ZK_NAMESPACE))
withDiscoveryClient(kyuubiConf) { discoveryClient =>
val fromNamespace = DiscoveryPaths.makePath(null, kyuubiConf.get(HA_ZK_NAMESPACE))
val toNamespace = getZkNamespace(args)

val currentServerNodes = getServiceNodesInfo(zkClient, fromNamespace)
val currentServerNodes = discoveryClient.getServiceNodesInfo(fromNamespace)
val exposedServiceNodes = ListBuffer[ServiceNodeInfo]()

if (currentServerNodes.nonEmpty) {
def doCreate(zc: CuratorFramework): Unit = {
def doCreate(zc: DiscoveryClient): Unit = {
currentServerNodes.foreach { sn =>
info(s"Exposing server instance:${sn.instance} with version:${sn.version}" +
s" from $fromNamespace to $toNamespace")
val newNodePath = createAndGetServiceNode(
val newNodePath = zc.createAndGetServiceNode(
kyuubiConf,
zc,
args.cliArgs.namespace,
sn.instance,
sn.version,
Expand All @@ -110,10 +107,10 @@ private[kyuubi] class ServiceControlCli extends Logging {
}

if (kyuubiConf.get(HA_ZK_QUORUM) == args.cliArgs.zkQuorum) {
doCreate(zkClient)
doCreate(discoveryClient)
} else {
kyuubiConf.set(HA_ZK_QUORUM, args.cliArgs.zkQuorum)
withZkClient(kyuubiConf)(doCreate)
withDiscoveryClient(kyuubiConf)(doCreate)
}
}

Expand All @@ -126,24 +123,24 @@ private[kyuubi] class ServiceControlCli extends Logging {
* List Kyuubi server nodes info.
*/
private def list(args: ServiceControlCliArguments, filterHostPort: Boolean): Unit = {
withZkClient(args.conf) { zkClient =>
withDiscoveryClient(args.conf) { discoveryClient =>
val znodeRoot = getZkNamespace(args)
val hostPortOpt =
if (filterHostPort) {
Some((args.cliArgs.host, args.cliArgs.port.toInt))
} else None
val nodes = getServiceNodes(zkClient, znodeRoot, hostPortOpt)
val nodes = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)

val title = "Zookeeper service nodes"
info(renderServiceNodesInfo(title, nodes, verbose))
}
}

private def getServiceNodes(
zkClient: CuratorFramework,
discoveryClient: DiscoveryClient,
znodeRoot: String,
hostPortOpt: Option[(String, Int)]): Seq[ServiceNodeInfo] = {
val serviceNodes = getServiceNodesInfo(zkClient, znodeRoot)
val serviceNodes = discoveryClient.getServiceNodesInfo(znodeRoot)
hostPortOpt match {
case Some((host, port)) => serviceNodes.filter { sn =>
sn.host == host && sn.port == port
Expand All @@ -156,17 +153,17 @@ private[kyuubi] class ServiceControlCli extends Logging {
* Delete zookeeper service node with specified host port.
*/
private def delete(args: ServiceControlCliArguments): Unit = {
withZkClient(args.conf) { zkClient =>
withDiscoveryClient(args.conf) { discoveryClient =>
val znodeRoot = getZkNamespace(args)
val hostPortOpt = Some((args.cliArgs.host, args.cliArgs.port.toInt))
val nodesToDelete = getServiceNodes(zkClient, znodeRoot, hostPortOpt)
val nodesToDelete = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)

val deletedNodes = ListBuffer[ServiceNodeInfo]()
nodesToDelete.foreach { node =>
val nodePath = s"$znodeRoot/${node.nodeName}"
info(s"Deleting zookeeper service node:$nodePath")
try {
zkClient.delete().forPath(nodePath)
discoveryClient.delete(nodePath)
deletedNodes += node
} catch {
case e: Exception =>
Expand Down Expand Up @@ -227,7 +224,7 @@ object ServiceControlCli extends CommandLineUtils with Logging {
private[ctl] def getZkNamespace(args: ServiceControlCliArguments): String = {
args.cliArgs.service match {
case ServiceControlObject.SERVER =>
ZKPaths.makePath(null, args.cliArgs.namespace)
DiscoveryPaths.makePath(null, args.cliArgs.namespace)
case ServiceControlObject.ENGINE =>
val engineType = Some(args.cliArgs.engineType)
.filter(_ != null).filter(_.nonEmpty)
Expand All @@ -237,10 +234,10 @@ object ServiceControlCli extends CommandLineUtils with Logging {
.getOrElse(args.conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).getOrElse("default"))
// The path of the engine defined in zookeeper comes from
// org.apache.kyuubi.engine.EngineRef#engineSpace
ZKPaths.makePath(
DiscoveryPaths.makePath(
s"${args.cliArgs.namespace}_${KYUUBI_VERSION}_${ShareLevel.USER}_${engineType}",
args.cliArgs.user,
engineSubdomain)
Array(engineSubdomain))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ package org.apache.kyuubi.ctl
import java.io.{OutputStream, PrintStream}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_NAMESPACE, HA_ZK_QUORUM}
import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider}
import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, ServiceNodeInfo}
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}

trait TestPrematureExit {
Expand Down Expand Up @@ -86,9 +85,8 @@ trait TestPrematureExit {
}

class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
import DiscoveryClientProvider._
import ServiceControlCli._
import ServiceDiscovery._
import ZooKeeperClientProvider._

val zkServer = new EmbeddedZookeeper()
val conf: KyuubiConf = KyuubiConf()
Expand Down Expand Up @@ -226,9 +224,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
System.setProperty(HA_ZK_NAMESPACE.key, uniqueNamespace)

withZkClient(conf) { framework =>
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
withDiscoveryClient(conf) { framework =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")

val newNamespace = getUniqueNamespace()
val args = Array(
Expand All @@ -245,15 +243,15 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {

testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false))
val znodeRoot = s"/$newNamespace"
val children = framework.getChildren.forPath(znodeRoot).asScala.sorted
val children = framework.getChildren(znodeRoot).sorted
assert(children.size == 2)

assert(children.head.startsWith(
s"serviceUri=localhost:10000;version=$KYUUBI_VERSION;sequence="))
assert(children.last.startsWith(
s"serviceUri=localhost:10001;version=$KYUUBI_VERSION;sequence="))
children.foreach { child =>
framework.delete().forPath(s"""$znodeRoot/$child""")
framework.delete(s"""$znodeRoot/$child""")
}
}
}
Expand Down Expand Up @@ -290,9 +288,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)

withZkClient(conf) { framework =>
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
withDiscoveryClient(conf) { framework =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")

val args = Array(
"list",
Expand All @@ -319,9 +317,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)

withZkClient(conf) { framework =>
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
withDiscoveryClient(conf) { framework =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")

val args = Array(
"get",
Expand Down Expand Up @@ -351,10 +349,10 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)

withZkClient(conf) { framework =>
withZkClient(conf) { zc =>
createAndGetServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true)
createAndGetServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true)
withDiscoveryClient(conf) { framework =>
withDiscoveryClient(conf) { zc =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000", external = true)
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001", external = true)
}

val args = Array(
Expand Down Expand Up @@ -385,9 +383,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)

withZkClient(conf) { framework =>
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
withDiscoveryClient(conf) { framework =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")

val args = Array(
"list",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.time.Duration
import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf, OptionalConfigEntry}
import org.apache.kyuubi.ha.client.{RetryPolicies, ZooKeeperAuthTypes}
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.ha.client.RetryPolicies

object HighAvailabilityConf {

Expand Down Expand Up @@ -51,16 +52,16 @@ object HighAvailabilityConf {
val HA_ZK_AUTH_TYPE: ConfigEntry[String] =
buildConf("kyuubi.ha.zookeeper.auth.type")
.doc("The type of zookeeper authentication, all candidates are " +
s"${ZooKeeperAuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
s"${AuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
.version("1.3.2")
.stringConf
.checkValues(ZooKeeperAuthTypes.values.map(_.toString))
.createWithDefault(ZooKeeperAuthTypes.NONE.toString)
.checkValues(AuthTypes.values.map(_.toString))
.createWithDefault(AuthTypes.NONE.toString)

val HA_ZK_ENGINE_AUTH_TYPE: ConfigEntry[String] =
buildConf("kyuubi.ha.zookeeper.engine.auth.type")
.doc("The type of zookeeper authentication for engine, all candidates are " +
s"${ZooKeeperAuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
s"${AuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
.version("1.3.2")
.fallbackConf(HA_ZK_AUTH_TYPE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.kyuubi.ha.client

object ZooKeeperAuthTypes extends Enumeration {
object AuthTypes extends Enumeration {

type ZooKeeperAuthType = Value
type AuthTypes = Value

val NONE, KERBEROS, DIGEST = Value

Expand Down
Loading

0 comments on commit 2233acc

Please sign in to comment.