Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5867] HiveEngine support run on YARN mode #5868

Closed
wants to merge 12 commits into from
10 changes: 10 additions & 0 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ class HiveSQLEngine extends Serverable("HiveSQLEngine") {
super.start()
// Start engine self-terminating checker after all services are ready and it can be reached by
// all servers in engine spaces.
backendService.sessionManager.startTerminatingChecker(() => stop())
backendService.sessionManager.startTerminatingChecker(() => {
selfExist = true
Copy link
Contributor

@zhouyifan279 zhouyifan279 Dec 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe selfExited ?

stop()
})
}

override protected def stopServer(): Unit = {
Expand Down Expand Up @@ -151,7 +154,8 @@ object HiveSQLEngine extends Logging {
}

} catch {
case t: Throwable => currentEngine match {
case t: Throwable =>
currentEngine match {
case Some(engine) =>
engine.stop()
val event = HiveEngineEvent(engine)
Expand All @@ -160,6 +164,7 @@ object HiveSQLEngine extends Logging {
case _ =>
error(s"Failed to start Hive SQL engine: ${t.getMessage}.", t)
}
throw t
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.hive.deploy

import java.io.File

import scala.collection.mutable.ListBuffer

import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf.ENGINE_HIVE_EXTRA_CLASSPATH
import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter
import org.apache.kyuubi.engine.hive.HiveSQLEngine

object HiveYarnModeSubmitter extends EngineYarnModeSubmitter {

def main(args: Array[String]): Unit = {
Utils.fromCommandLineArgs(args, kyuubiConf)
submitApplication()
}

override var engineType: String = "hive"

override def engineMainClass(): String = HiveSQLEngine.getClass.getName

/**
* Jar list for the Hive engine.
*/
override def engineExtraJars(): Seq[File] = {
val hadoopCp = sys.env.get("HIVE_HADOOP_CLASSPATH")
val extraCp = kyuubiConf.get(ENGINE_HIVE_EXTRA_CLASSPATH)
val jars = new ListBuffer[File]
hadoopCp.foreach(cp => parseClasspath(cp, jars))
extraCp.foreach(cp => parseClasspath(cp, jars))
jars.toSeq
}

private[hive] def parseClasspath(classpath: String, jars: ListBuffer[File]): Unit = {
classpath.split(":").filter(_.nonEmpty).foreach { cp =>
if (cp.endsWith("/*")) {
val dir = cp.substring(0, cp.length - 2)
new File(dir) match {
case f if f.isDirectory =>
f.listFiles().filter(_.getName.endsWith(".jar")).foreach(jars += _)
case _ =>
}
} else {
jars += new File(cp)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.hive.deploy

import java.io.File

import scala.collection.mutable.ListBuffer

import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, SCALA_COMPILE_VERSION, Utils}

class HiveYarnModeSubmitterSuite extends KyuubiFunSuite {
val hiveEngineHome: String = Utils.getCodeSourceLocation(getClass).split("/target")(0)

test("hadoop class path") {
val jars = new ListBuffer[File]
val classpath =
s"$hiveEngineHome/target/scala-$SCALA_COMPILE_VERSION/jars/*:" +
s"$hiveEngineHome/target/kyuubi-hive-sql-engine-$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"
HiveYarnModeSubmitter.parseClasspath(classpath, jars)
assert(jars.nonEmpty)
assert(jars.exists(
_.getName == s"kyuubi-hive-sql-engine-$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"))
}

}
31 changes: 31 additions & 0 deletions integration-tests/kyuubi-hive-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,37 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- YARN -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-minicluster</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.activation</groupId>
<artifactId>jakarta.activation-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.it.hive.operation

import org.apache.kyuubi.{HiveEngineTests, Utils, WithKyuubiServerAndHadoopMiniCluster}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_IDLE_TIMEOUT, ENGINE_TYPE, KYUUBI_ENGINE_ENV_PREFIX, KYUUBI_HOME}
import org.apache.kyuubi.engine.deploy.DeployMode

class KyuubiOperationHiveEngineYarnModeSuite extends HiveEngineTests
with WithKyuubiServerAndHadoopMiniCluster {

override protected val conf: KyuubiConf = {
val metastore = Utils.createTempDir(prefix = getClass.getSimpleName)
metastore.toFile.delete()
KyuubiConf()
.set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
.set(ENGINE_TYPE, "HIVE_SQL")
.set(KyuubiConf.ENGINE_HIVE_DEPLOY_MODE, DeployMode.YARN.toString)
// increase this to 30s as hive session state and metastore client is slow initializing
.setIfMissing(ENGINE_IDLE_TIMEOUT, 30000L)
.set("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastore;create=true")
}

override def beforeAll(): Unit = {
super.beforeAll()
conf
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_MEMORY, Math.min(getYarnMaximumAllocationMb, 1024))
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_CORES, 1)
}

override protected def jdbcUrl: String = getJdbcUrl
}
11 changes: 11 additions & 0 deletions kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ object Utils extends Logging {
dir
}

/**
* List the files recursively in a directory.
*/
def listFilesRecursively(file: File): Seq[File] = {
if (!file.isDirectory) {
file :: Nil
} else {
file.listFiles().flatMap(listFilesRecursively)
}
}

/**
* Copies bytes from an InputStream source to a newly created temporary file
* created in the directory destination. The temporary file will be created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.util.matching.Regex
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.{EngineType, ShareLevel}
import org.apache.kyuubi.engine.deploy.DeployMode
import org.apache.kyuubi.operation.{NoneMode, PlainStyle}
import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP}

Expand Down Expand Up @@ -231,6 +232,7 @@ object KyuubiConf {
final val KYUUBI_CONF_FILE_NAME = "kyuubi-defaults.conf"
final val KYUUBI_HOME = "KYUUBI_HOME"
final val KYUUBI_ENGINE_ENV_PREFIX = "kyuubi.engineEnv"
final val KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX = "kyuubi.engine.yarn.AMEnv"
final val KYUUBI_BATCH_CONF_PREFIX = "kyuubi.batchConf"
final val KYUUBI_KUBERNETES_CONF_PREFIX = "kyuubi.kubernetes"
final val USER_DEFAULTS_CONF_QUOTE = "___"
Expand Down Expand Up @@ -2693,6 +2695,85 @@ object KyuubiConf {
.stringConf
.createOptional

val ENGINE_HIVE_DEPLOY_MODE: ConfigEntry[String] =
buildConf("kyuubi.engine.hive.deploy.mode")
.doc("Configures the hive engine deploy mode, The value can be 'local', 'yarn'. " +
"In local mode, the engine operates on the same node as the KyuubiServer. " +
"In YARN mode, the engine runs within the Application Master (AM) container of YARN. ")
.version("1.9.0")
.stringConf
.transformToUpperCase
.checkValue(
mode => Set("LOCAL", "YARN").contains(mode),
"Invalid value for 'kyuubi.engine.hive.deploy.mode'. Valid values are 'local', 'yarn'.")
.createWithDefault(DeployMode.LOCAL.toString)

val ENGINE_DEPLOY_YARN_MODE_STAGING_DIR: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.yarn.stagingDir")
.doc("Staging directory used while submitting kyuubi engine to YARN, " +
"It should be a absolute path in HDFS.")
.version("1.9.0")
.stringConf
.createOptional

val ENGINE_DEPLOY_YARN_MODE_REPORT_INTERVAL: ConfigEntry[Long] =
buildConf("kyuubi.engine.yarn.report.interval")
.doc("Interval between reports of the current engine on yarn app status.")
.version("1.9.0")
.timeConf
.checkValue(t => t > 0, "must be positive integer")
.createWithDefault(Duration.ofSeconds(1).toMillis)

val ENGINE_DEPLOY_YARN_MODE_TAGS: OptionalConfigEntry[Seq[String]] =
buildConf("kyuubi.engine.yarn.tags")
.doc(s"kyuubi engine yarn tags when the engine deploy mode is YARN.")
.version("1.9.0")
.stringConf
.toSequence()
.createOptional

val ENGINE_DEPLOY_YARN_MODE_QUEUE: ConfigEntry[String] =
buildConf("kyuubi.engine.yarn.queue")
.doc(s"kyuubi engine yarn queue when the engine deploy mode is YARN.")
.version("1.9.0")
.stringConf
.createWithDefault("default")

val ENGINE_DEPLOY_YARN_MODE_PRIORITY: OptionalConfigEntry[Int] =
buildConf("kyuubi.engine.yarn.priority")
.doc(s"kyuubi engine yarn priority when the engine deploy mode is YARN.")
.version("1.9.0")
.intConf
.createOptional

val ENGINE_DEPLOY_YARN_MODE_APP_NAME: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.yarn.app.name")
.doc(s"The YARN app name when the engine deploy mode is YARN.")
.version("1.9.0")
.stringConf
.createOptional

val ENGINE_DEPLOY_YARN_MODE_MEMORY: ConfigEntry[Int] =
buildConf("kyuubi.engine.yarn.memory")
.doc(s"kyuubi engine container memory in mb when the engine deploy mode is YARN.")
.version("1.9.0")
.intConf
.createWithDefault(1024)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use human-readable format? e.g. 1g, 1024m

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, plan support bytesConf in the follow up PR


val ENGINE_DEPLOY_YARN_MODE_CORES: ConfigEntry[Int] =
buildConf("kyuubi.engine.yarn.cores")
.doc(s"kyuubi engine container core number when the engine deploy mode is YARN.")
.version("1.9.0")
.intConf
.createWithDefault(1)

val ENGINE_DEPLOY_YARN_MODE_JAVA_OPTIONS: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.yarn.java.options")
.doc(s"The extra Java options for the AM when the engine deploy mode is YARN.")
.version("1.9.0")
.stringConf
.createOptional

val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.flink.memory")
.doc("The heap memory for the Flink SQL engine. Only effective in yarn session mode.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.deploy

object DeployMode extends Enumeration {
type DeployMode = Value
val
/**
* In this mode, the engine will be launched locally.
*/
LOCAL,
/**
* In this mode, the engine will be launched on YARN.
*/
YARN,
/**
* In this mode, the engine will be launched on Kubernetes.
*/
KUBERNETES = Value
}
Loading
Loading