Skip to content

Commit

Permalink
HiveEngine support run on YARN mode
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf committed Dec 26, 2023
1 parent c6177ab commit 98ff19c
Show file tree
Hide file tree
Showing 22 changed files with 1,310 additions and 15 deletions.
9 changes: 9 additions & 0 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ class HiveSQLEngine extends Serverable("HiveSQLEngine") {
super.start()
// Start engine self-terminating checker after all services are ready and it can be reached by
// all servers in engine spaces.
backendService.sessionManager.startTerminatingChecker(() => stop())
backendService.sessionManager.startTerminatingChecker(() => {
selfExist = true
stop()
})
}

override protected def stopServer(): Unit = {
Expand Down Expand Up @@ -151,7 +154,8 @@ object HiveSQLEngine extends Logging {
}

} catch {
case t: Throwable => currentEngine match {
case t: Throwable =>
currentEngine match {
case Some(engine) =>
engine.stop()
val event = HiveEngineEvent(engine)
Expand All @@ -160,6 +164,7 @@ object HiveSQLEngine extends Logging {
case _ =>
error(s"Failed to start Hive SQL engine: ${t.getMessage}.", t)
}
throw t
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.hive.deploy

import org.apache.kyuubi.Utils
import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.util.KyuubiHadoopUtils

object HiveYarnModeSubmitter extends EngineYarnModeSubmitter {

def main(args: Array[String]): Unit = {
Utils.fromCommandLineArgs(args, kyuubiConf)
// Initialize the engine submitter.
init()
// Submit the engine application to YARN.
submitApplication()
}

private def init(): Unit = {
yarnConf = KyuubiHadoopUtils.newYarnConfiguration(kyuubiConf)
hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf)
}

override var engineType: String = "hive"

override def engineMainClass(): String = HiveSQLEngine.getClass.getName
}
31 changes: 31 additions & 0 deletions integration-tests/kyuubi-hive-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,37 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- YARN -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-minicluster</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.activation</groupId>
<artifactId>jakarta.activation-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.it.hive.operation

import org.apache.kyuubi.{HiveEngineTests, Utils, WithKyuubiServerAndHadoopMiniCluster}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_IDLE_TIMEOUT, ENGINE_TYPE, KYUUBI_ENGINE_ENV_PREFIX, KYUUBI_HOME}
import org.apache.kyuubi.engine.deploy.YarnMode

class KyuubiOperationHiveEngineYarnModeSuite extends HiveEngineTests
with WithKyuubiServerAndHadoopMiniCluster {

override protected val conf: KyuubiConf = {
val metastore = Utils.createTempDir(prefix = getClass.getSimpleName)
metastore.toFile.delete()
KyuubiConf()
.set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
.set(ENGINE_TYPE, "HIVE_SQL")
.set(KyuubiConf.ENGINE_DEPLOY_MODE, YarnMode.name)
// increase this to 30s as hive session state and metastore client is slow initializing
.setIfMissing(ENGINE_IDLE_TIMEOUT, 30000L)
.set("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastore;create=true")
}

override def beforeAll(): Unit = {
super.beforeAll()
conf
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_MEMORY, Math.min(getYarnMaximumAllocationMb, 1024))
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_MAX_APP_ATTEMPTS, 1)
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_CORES, 1)
}

override protected def jdbcUrl: String = getJdbcUrl
}
11 changes: 11 additions & 0 deletions kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ object Utils extends Logging {
dir
}

/**
* List the files recursively in a directory.
*/
def listFilesRecursive(file: File): Seq[File] = {
if (!file.isDirectory) {
file :: Nil
} else {
file.listFiles().flatMap(listFilesRecursive)
}
}

/**
* Copies bytes from an InputStream source to a newly created temporary file
* created in the directory destination. The temporary file will be created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.util.matching.Regex
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.{EngineType, ShareLevel}
import org.apache.kyuubi.engine.deploy.LocalMode
import org.apache.kyuubi.operation.{NoneMode, PlainStyle}
import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP}

Expand Down Expand Up @@ -231,6 +232,7 @@ object KyuubiConf {
final val KYUUBI_CONF_FILE_NAME = "kyuubi-defaults.conf"
final val KYUUBI_HOME = "KYUUBI_HOME"
final val KYUUBI_ENGINE_ENV_PREFIX = "kyuubi.engineEnv"
final val KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX = "kyuubi.engine.yarn.AMEnv"
final val KYUUBI_BATCH_CONF_PREFIX = "kyuubi.batchConf"
final val KYUUBI_KUBERNETES_CONF_PREFIX = "kyuubi.kubernetes"
final val USER_DEFAULTS_CONF_QUOTE = "___"
Expand Down Expand Up @@ -2693,6 +2695,79 @@ object KyuubiConf {
.stringConf
.createOptional

val ENGINE_DEPLOY_MODE: ConfigEntry[String] =
buildConf("kyuubi.engine.deploy.mode")
.doc("Configures the engine deploy mode, The value can be 'local', 'yarn'. " +
"In local mode, the engine operates on the same node as the KyuubiServer. " +
"In YARN mode, the engine runs within the Application Master (AM) container of YARN. " +
"Currently, only Hive engine supports YARN mode.")
.version("1.9.0")
.stringConf
.transformToUpperCase
.checkValue(
mode => Set("LOCAL", "YARN").contains(mode),
"Invalid value for 'kyuubi.engine.hive.deploy.mode'. Valid values are 'local', 'yarn'.")
.createWithDefault(LocalMode.name)

val ENGINE_DEPLOY_YARN_MODE_STAGING_DIR: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.yarn.stagingDir")
.doc("Staging directory used while submitting kyuubi engine to YARN.")
.version("1.9.0")
.stringConf
.createOptional

val ENGINE_DEPLOY_YARN_MODE_REPORT_INTERVAL: ConfigEntry[Long] =
buildConf("kyuubi.engine.yarn.report.interval")
.doc("Interval between reports of the current engine on yarn app status.")
.version("1.9.0")
.timeConf
.checkValue(t => t > 0, "must be positive integer")
.createWithDefault(Duration.ofSeconds(1).toMillis)

val ENGINE_DEPLOY_YARN_MODE_TAGS: OptionalConfigEntry[Seq[String]] =
buildConf("kyuubi.engine.yarn.tags")
.doc(s"kyuubi engine yarn tags when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
.version("1.9.0")
.stringConf
.toSequence()
.createOptional

val ENGINE_DEPLOY_YARN_MODE_QUEUE: ConfigEntry[String] =
buildConf("kyuubi.engine.yarn.queue")
.doc(s"kyuubi engine yarn queue when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
.version("1.9.0")
.stringConf
.createWithDefault("default")

val ENGINE_DEPLOY_YARN_MODE_MAX_APP_ATTEMPTS: OptionalConfigEntry[Int] =
buildConf("kyuubi.engine.yarn.maxAppAttempts")
.doc(s"Maximum number of AM attempts before failing the app " +
s"when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
.version("1.9.0")
.intConf
.createOptional

val ENGINE_DEPLOY_YARN_MODE_MEMORY: ConfigEntry[Int] =
buildConf("kyuubi.engine.yarn.memory")
.doc(s"kyuubi engine container memory in mb when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
.version("1.9.0")
.intConf
.createWithDefault(1024)

val ENGINE_DEPLOY_YARN_MODE_CORES: ConfigEntry[Int] =
buildConf("kyuubi.engine.yarn.cores")
.doc(s"kyuubi engine container core number when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
.version("1.9.0")
.intConf
.createWithDefault(1)

val ENGINE_DEPLOY_YARN_MODE_JAVA_OPTIONS: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.yarn.java.options")
.doc(s"The extra Java options for the AM when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
.version("1.9.0")
.stringConf
.createOptional

val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.flink.memory")
.doc("The heap memory for the Flink SQL engine. Only effective in yarn session mode.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.deploy

import java.util.Locale

import org.apache.kyuubi.Logging

sealed trait DeployMode {

/**
* String name of the engine deploy mode.
*/
def name: String
}

case object LocalMode extends DeployMode { val name = "local" }

case object YarnMode extends DeployMode { val name = "yarn" }

case object KubernetesMode extends DeployMode { val name = "kubernetes" }

object DeployMode extends Logging {

/**
* Returns the engine deploy mode from the given string.
*/
def fromString(mode: String): DeployMode = mode.toLowerCase(Locale.ROOT) match {
case LocalMode.name => LocalMode
case YarnMode.name => YarnMode
case KubernetesMode.name => KubernetesMode
case _ =>
warn(s"Unknown deploy mode: $mode, fallback to local mode.")
LocalMode
}
}
Loading

0 comments on commit 98ff19c

Please sign in to comment.