Skip to content

Commit

Permalink
[KYUUBI #1989] Decouple curator from other modules
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

Decouple curator from other modules.

Mainly changes:
1. `ZookeeperClientProvider` -> `zookeeper.ClientProvider`
2. `zkClient` apis -> `DiscoveryClient`
3. `ZooKeeperClientProvider.withZkClient` -> `DiscoveryClientProvider.withDiscoveryClient`

### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1991 from hddong/decuple-discovery.

Closes #1989

3efcbbf [hongdongdong] fix
2233acc [hongdongdong] [KYUUBI #1989] Decouple curator from other modules

Authored-by: hongdongdong <hongdongdong@cmss.chinamobile.com>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
hddong authored and yaooqinn committed Apr 13, 2022
1 parent a63e811 commit 9101068
Show file tree
Hide file tree
Showing 28 changed files with 906 additions and 571 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,36 +50,36 @@ abstract class ShareLevelSparkEngineSuite
}

test("check discovery service is clean up with different share level") {
withZkClient { zkClient =>
withDiscoveryClient { discoveryClient =>
assert(engine.getServiceState == ServiceState.STARTED)
assert(zkClient.checkExists().forPath(namespace) != null)
assert(discoveryClient.pathExists(namespace))
withJdbcStatement() { _ => }
shareLevel match {
// Connection level, we will cleanup namespace since it's always a global unique value.
case ShareLevel.CONNECTION =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(zkClient.checkExists().forPath(namespace) == null)
assert(discoveryClient.pathNonExists(namespace))
case _ =>
assert(engine.getServiceState == ServiceState.STARTED)
assert(zkClient.checkExists().forPath(namespace) != null)
assert(discoveryClient.pathExists(namespace))
}
}
}

test("test spark engine max life-time") {
withZkClient { zkClient =>
withDiscoveryClient { discoveryClient =>
assert(engine.getServiceState == ServiceState.STARTED)
assert(zkClient.checkExists().forPath(namespace) != null)
assert(discoveryClient.pathExists(namespace))
withJdbcStatement() { _ => }

eventually(Timeout(30.seconds)) {
shareLevel match {
case ShareLevel.CONNECTION =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(zkClient.checkExists().forPath(namespace) == null)
assert(discoveryClient.pathNonExists(namespace))
case _ =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(zkClient.checkExists().forPath(namespace) != null)
assert(discoveryClient.pathExists(namespace))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.kyuubi.engine.spark

import org.apache.curator.framework.CuratorFramework

import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_AUTH_TYPE, HA_ZK_NAMESPACE, HA_ZK_QUORUM}
import org.apache.kyuubi.ha.client.{ZooKeeperAuthTypes, ZooKeeperClientProvider}
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.ha.client.DiscoveryClient
import org.apache.kyuubi.ha.client.DiscoveryClientProvider
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}

trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
Expand All @@ -32,7 +32,7 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
assert(zkServer != null)
Map(
HA_ZK_QUORUM.key -> zkServer.getConnectString,
HA_ZK_AUTH_TYPE.key -> ZooKeeperAuthTypes.NONE.toString,
HA_ZK_AUTH_TYPE.key -> AuthTypes.NONE.toString,
HA_ZK_NAMESPACE.key -> namespace)
}

Expand Down Expand Up @@ -62,8 +62,8 @@ trait WithDiscoverySparkSQLEngine extends WithSparkSQLEngine {
stopSparkEngine()
}

def withZkClient(f: CuratorFramework => Unit): Unit = {
ZooKeeperClientProvider.withZkClient(kyuubiConf)(f)
def withDiscoveryClient(f: DiscoveryClient => Unit): Unit = {
DiscoveryClientProvider.withDiscoveryClient(kyuubiConf)(f)
}

protected def getDiscoveryConnectionString: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package org.apache.kyuubi.ctl

import scala.collection.mutable.ListBuffer

import org.apache.curator.framework.CuratorFramework
import org.apache.curator.utils.ZKPaths

import org.apache.kyuubi.{KYUUBI_VERSION, Logging}
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN
import org.apache.kyuubi.config.KyuubiConf.ENGINE_TYPE
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider}
import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, ServiceNodeInfo}
import org.apache.kyuubi.ha.client.DiscoveryClient
import org.apache.kyuubi.ha.client.DiscoveryPaths

private[ctl] object ServiceControlAction extends Enumeration {
type ServiceControlAction = Value
Expand All @@ -43,9 +42,8 @@ private[ctl] object ServiceControlObject extends Enumeration {
* Main gateway of launching a Kyuubi Ctl action.
*/
private[kyuubi] class ServiceControlCli extends Logging {
import DiscoveryClientProvider._
import ServiceControlCli._
import ServiceDiscovery._
import ZooKeeperClientProvider._

private var verbose: Boolean = false

Expand Down Expand Up @@ -84,21 +82,20 @@ private[kyuubi] class ServiceControlCli extends Logging {
val kyuubiConf = args.conf

kyuubiConf.setIfMissing(HA_ZK_QUORUM, args.cliArgs.zkQuorum)
withZkClient(kyuubiConf) { zkClient =>
val fromNamespace = ZKPaths.makePath(null, kyuubiConf.get(HA_ZK_NAMESPACE))
withDiscoveryClient(kyuubiConf) { discoveryClient =>
val fromNamespace = DiscoveryPaths.makePath(null, kyuubiConf.get(HA_ZK_NAMESPACE))
val toNamespace = getZkNamespace(args)

val currentServerNodes = getServiceNodesInfo(zkClient, fromNamespace)
val currentServerNodes = discoveryClient.getServiceNodesInfo(fromNamespace)
val exposedServiceNodes = ListBuffer[ServiceNodeInfo]()

if (currentServerNodes.nonEmpty) {
def doCreate(zc: CuratorFramework): Unit = {
def doCreate(zc: DiscoveryClient): Unit = {
currentServerNodes.foreach { sn =>
info(s"Exposing server instance:${sn.instance} with version:${sn.version}" +
s" from $fromNamespace to $toNamespace")
val newNodePath = createAndGetServiceNode(
val newNodePath = zc.createAndGetServiceNode(
kyuubiConf,
zc,
args.cliArgs.namespace,
sn.instance,
sn.version,
Expand All @@ -110,10 +107,10 @@ private[kyuubi] class ServiceControlCli extends Logging {
}

if (kyuubiConf.get(HA_ZK_QUORUM) == args.cliArgs.zkQuorum) {
doCreate(zkClient)
doCreate(discoveryClient)
} else {
kyuubiConf.set(HA_ZK_QUORUM, args.cliArgs.zkQuorum)
withZkClient(kyuubiConf)(doCreate)
withDiscoveryClient(kyuubiConf)(doCreate)
}
}

Expand All @@ -126,24 +123,24 @@ private[kyuubi] class ServiceControlCli extends Logging {
* List Kyuubi server nodes info.
*/
private def list(args: ServiceControlCliArguments, filterHostPort: Boolean): Unit = {
withZkClient(args.conf) { zkClient =>
withDiscoveryClient(args.conf) { discoveryClient =>
val znodeRoot = getZkNamespace(args)
val hostPortOpt =
if (filterHostPort) {
Some((args.cliArgs.host, args.cliArgs.port.toInt))
} else None
val nodes = getServiceNodes(zkClient, znodeRoot, hostPortOpt)
val nodes = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)

val title = "Zookeeper service nodes"
info(renderServiceNodesInfo(title, nodes, verbose))
}
}

private def getServiceNodes(
zkClient: CuratorFramework,
discoveryClient: DiscoveryClient,
znodeRoot: String,
hostPortOpt: Option[(String, Int)]): Seq[ServiceNodeInfo] = {
val serviceNodes = getServiceNodesInfo(zkClient, znodeRoot)
val serviceNodes = discoveryClient.getServiceNodesInfo(znodeRoot)
hostPortOpt match {
case Some((host, port)) => serviceNodes.filter { sn =>
sn.host == host && sn.port == port
Expand All @@ -156,17 +153,17 @@ private[kyuubi] class ServiceControlCli extends Logging {
* Delete zookeeper service node with specified host port.
*/
private def delete(args: ServiceControlCliArguments): Unit = {
withZkClient(args.conf) { zkClient =>
withDiscoveryClient(args.conf) { discoveryClient =>
val znodeRoot = getZkNamespace(args)
val hostPortOpt = Some((args.cliArgs.host, args.cliArgs.port.toInt))
val nodesToDelete = getServiceNodes(zkClient, znodeRoot, hostPortOpt)
val nodesToDelete = getServiceNodes(discoveryClient, znodeRoot, hostPortOpt)

val deletedNodes = ListBuffer[ServiceNodeInfo]()
nodesToDelete.foreach { node =>
val nodePath = s"$znodeRoot/${node.nodeName}"
info(s"Deleting zookeeper service node:$nodePath")
try {
zkClient.delete().forPath(nodePath)
discoveryClient.delete(nodePath)
deletedNodes += node
} catch {
case e: Exception =>
Expand Down Expand Up @@ -227,7 +224,7 @@ object ServiceControlCli extends CommandLineUtils with Logging {
private[ctl] def getZkNamespace(args: ServiceControlCliArguments): String = {
args.cliArgs.service match {
case ServiceControlObject.SERVER =>
ZKPaths.makePath(null, args.cliArgs.namespace)
DiscoveryPaths.makePath(null, args.cliArgs.namespace)
case ServiceControlObject.ENGINE =>
val engineType = Some(args.cliArgs.engineType)
.filter(_ != null).filter(_.nonEmpty)
Expand All @@ -237,10 +234,10 @@ object ServiceControlCli extends CommandLineUtils with Logging {
.getOrElse(args.conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).getOrElse("default"))
// The path of the engine defined in zookeeper comes from
// org.apache.kyuubi.engine.EngineRef#engineSpace
ZKPaths.makePath(
DiscoveryPaths.makePath(
s"${args.cliArgs.namespace}_${KYUUBI_VERSION}_${ShareLevel.USER}_${engineType}",
args.cliArgs.user,
engineSubdomain)
Array(engineSubdomain))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ package org.apache.kyuubi.ctl
import java.io.{OutputStream, PrintStream}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_NAMESPACE, HA_ZK_QUORUM}
import org.apache.kyuubi.ha.client.{ServiceDiscovery, ServiceNodeInfo, ZooKeeperClientProvider}
import org.apache.kyuubi.ha.client.{DiscoveryClientProvider, ServiceNodeInfo}
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}

trait TestPrematureExit {
Expand Down Expand Up @@ -86,9 +85,8 @@ trait TestPrematureExit {
}

class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
import DiscoveryClientProvider._
import ServiceControlCli._
import ServiceDiscovery._
import ZooKeeperClientProvider._

val zkServer = new EmbeddedZookeeper()
val conf: KyuubiConf = KyuubiConf()
Expand Down Expand Up @@ -226,9 +224,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
System.setProperty(HA_ZK_NAMESPACE.key, uniqueNamespace)

withZkClient(conf) { framework =>
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
withDiscoveryClient(conf) { framework =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")

val newNamespace = getUniqueNamespace()
val args = Array(
Expand All @@ -245,15 +243,15 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {

testPrematureExit(args, getRenderedNodesInfoWithoutTitle(expectedCreatedNodes, false))
val znodeRoot = s"/$newNamespace"
val children = framework.getChildren.forPath(znodeRoot).asScala.sorted
val children = framework.getChildren(znodeRoot).sorted
assert(children.size == 2)

assert(children.head.startsWith(
s"serviceUri=localhost:10000;version=$KYUUBI_VERSION;sequence="))
assert(children.last.startsWith(
s"serviceUri=localhost:10001;version=$KYUUBI_VERSION;sequence="))
children.foreach { child =>
framework.delete().forPath(s"""$znodeRoot/$child""")
framework.delete(s"""$znodeRoot/$child""")
}
}
}
Expand Down Expand Up @@ -290,9 +288,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)

withZkClient(conf) { framework =>
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
withDiscoveryClient(conf) { framework =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")

val args = Array(
"list",
Expand All @@ -319,9 +317,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)

withZkClient(conf) { framework =>
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
withDiscoveryClient(conf) { framework =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")

val args = Array(
"get",
Expand Down Expand Up @@ -351,10 +349,10 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)

withZkClient(conf) { framework =>
withZkClient(conf) { zc =>
createAndGetServiceNode(conf, zc, uniqueNamespace, "localhost:10000", external = true)
createAndGetServiceNode(conf, zc, uniqueNamespace, "localhost:10001", external = true)
withDiscoveryClient(conf) { framework =>
withDiscoveryClient(conf) { zc =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000", external = true)
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001", external = true)
}

val args = Array(
Expand Down Expand Up @@ -385,9 +383,9 @@ class ServiceControlCliSuite extends KyuubiFunSuite with TestPrematureExit {
.set(HA_ZK_NAMESPACE, uniqueNamespace)
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)

withZkClient(conf) { framework =>
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10000")
createAndGetServiceNode(conf, framework, uniqueNamespace, "localhost:10001")
withDiscoveryClient(conf) { framework =>
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10000")
framework.createAndGetServiceNode(conf, uniqueNamespace, "localhost:10001")

val args = Array(
"list",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.time.Duration
import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi.config.{ConfigBuilder, ConfigEntry, KyuubiConf, OptionalConfigEntry}
import org.apache.kyuubi.ha.client.{RetryPolicies, ZooKeeperAuthTypes}
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.ha.client.RetryPolicies

object HighAvailabilityConf {

Expand Down Expand Up @@ -51,16 +52,16 @@ object HighAvailabilityConf {
val HA_ZK_AUTH_TYPE: ConfigEntry[String] =
buildConf("kyuubi.ha.zookeeper.auth.type")
.doc("The type of zookeeper authentication, all candidates are " +
s"${ZooKeeperAuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
s"${AuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
.version("1.3.2")
.stringConf
.checkValues(ZooKeeperAuthTypes.values.map(_.toString))
.createWithDefault(ZooKeeperAuthTypes.NONE.toString)
.checkValues(AuthTypes.values.map(_.toString))
.createWithDefault(AuthTypes.NONE.toString)

val HA_ZK_ENGINE_AUTH_TYPE: ConfigEntry[String] =
buildConf("kyuubi.ha.zookeeper.engine.auth.type")
.doc("The type of zookeeper authentication for engine, all candidates are " +
s"${ZooKeeperAuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
s"${AuthTypes.values.mkString("<ul><li>", "</li><li> ", "</li></ul>")}")
.version("1.3.2")
.fallbackConf(HA_ZK_AUTH_TYPE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.kyuubi.ha.client

object ZooKeeperAuthTypes extends Enumeration {
object AuthTypes extends Enumeration {

type ZooKeeperAuthType = Value
type AuthTypes = Value

val NONE, KERBEROS, DIGEST = Value

Expand Down
Loading

0 comments on commit 9101068

Please sign in to comment.