Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5867] HiveEngine support run on YARN mode #5868

Closed
wants to merge 12 commits into from
9 changes: 9 additions & 0 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ class HiveSQLEngine extends Serverable("HiveSQLEngine") {
super.start()
// Start engine self-terminating checker after all services are ready and it can be reached by
// all servers in engine spaces.
backendService.sessionManager.startTerminatingChecker(() => stop())
backendService.sessionManager.startTerminatingChecker(() => {
selfExist = true
Copy link
Contributor

@zhouyifan279 zhouyifan279 Dec 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe selfExited ?

stop()
})
}

override protected def stopServer(): Unit = {
Expand Down Expand Up @@ -151,7 +154,8 @@ object HiveSQLEngine extends Logging {
}

} catch {
case t: Throwable => currentEngine match {
case t: Throwable =>
currentEngine match {
case Some(engine) =>
engine.stop()
val event = HiveEngineEvent(engine)
Expand All @@ -160,6 +164,7 @@ object HiveSQLEngine extends Logging {
case _ =>
error(s"Failed to start Hive SQL engine: ${t.getMessage}.", t)
}
throw t
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.hive.deploy

import org.apache.kyuubi.Utils
import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.util.KyuubiHadoopUtils

object HiveYarnModeSubmitter extends EngineYarnModeSubmitter {

def main(args: Array[String]): Unit = {
Utils.fromCommandLineArgs(args, kyuubiConf)
// Initialize the engine submitter.
init()
// Submit the engine application to YARN.
submitApplication()
}

private def init(): Unit = {
yarnConf = KyuubiHadoopUtils.newYarnConfiguration(kyuubiConf)
hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf)
}

override var engineType: String = "hive"

override def engineMainClass(): String = HiveSQLEngine.getClass.getName
}
31 changes: 31 additions & 0 deletions integration-tests/kyuubi-hive-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,37 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- YARN -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-minicluster</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.activation</groupId>
<artifactId>jakarta.activation-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.it.hive.operation

import org.apache.kyuubi.{HiveEngineTests, Utils, WithKyuubiServerAndHadoopMiniCluster}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_IDLE_TIMEOUT, ENGINE_TYPE, KYUUBI_ENGINE_ENV_PREFIX, KYUUBI_HOME}
import org.apache.kyuubi.engine.deploy.YarnMode

class KyuubiOperationHiveEngineYarnModeSuite extends HiveEngineTests
with WithKyuubiServerAndHadoopMiniCluster {

override protected val conf: KyuubiConf = {
val metastore = Utils.createTempDir(prefix = getClass.getSimpleName)
metastore.toFile.delete()
KyuubiConf()
.set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
.set(ENGINE_TYPE, "HIVE_SQL")
.set(KyuubiConf.ENGINE_DEPLOY_MODE, YarnMode.name)
// increase this to 30s as hive session state and metastore client is slow initializing
.setIfMissing(ENGINE_IDLE_TIMEOUT, 30000L)
.set("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastore;create=true")
}

override def beforeAll(): Unit = {
super.beforeAll()
conf
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_MEMORY, Math.min(getYarnMaximumAllocationMb, 1024))
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_MAX_APP_ATTEMPTS, 1)
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_CORES, 1)
}

override protected def jdbcUrl: String = getJdbcUrl
}
11 changes: 11 additions & 0 deletions kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ object Utils extends Logging {
dir
}

/**
* List the files recursively in a directory.
*/
def listFilesRecursive(file: File): Seq[File] = {
yikf marked this conversation as resolved.
Show resolved Hide resolved
if (!file.isDirectory) {
file :: Nil
} else {
file.listFiles().flatMap(listFilesRecursive)
}
}

/**
* Copies bytes from an InputStream source to a newly created temporary file
* created in the directory destination. The temporary file will be created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.util.matching.Regex
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.{EngineType, ShareLevel}
import org.apache.kyuubi.engine.deploy.LocalMode
import org.apache.kyuubi.operation.{NoneMode, PlainStyle}
import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP}

Expand Down Expand Up @@ -231,6 +232,7 @@ object KyuubiConf {
final val KYUUBI_CONF_FILE_NAME = "kyuubi-defaults.conf"
final val KYUUBI_HOME = "KYUUBI_HOME"
final val KYUUBI_ENGINE_ENV_PREFIX = "kyuubi.engineEnv"
final val KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX = "kyuubi.engine.yarn.AMEnv"
final val KYUUBI_BATCH_CONF_PREFIX = "kyuubi.batchConf"
final val KYUUBI_KUBERNETES_CONF_PREFIX = "kyuubi.kubernetes"
final val USER_DEFAULTS_CONF_QUOTE = "___"
Expand Down Expand Up @@ -2693,6 +2695,79 @@ object KyuubiConf {
.stringConf
.createOptional

val ENGINE_DEPLOY_MODE: ConfigEntry[String] =
buildConf("kyuubi.engine.deploy.mode")
.doc("Configures the engine deploy mode, The value can be 'local', 'yarn'. " +
"In local mode, the engine operates on the same node as the KyuubiServer. " +
"In YARN mode, the engine runs within the Application Master (AM) container of YARN. " +
yikf marked this conversation as resolved.
Show resolved Hide resolved
"Currently, only Hive engine supports YARN mode.")
yikf marked this conversation as resolved.
Show resolved Hide resolved
.version("1.9.0")
.stringConf
.transformToUpperCase
.checkValue(
mode => Set("LOCAL", "YARN").contains(mode),
"Invalid value for 'kyuubi.engine.hive.deploy.mode'. Valid values are 'local', 'yarn'.")
.createWithDefault(LocalMode.name)

val ENGINE_DEPLOY_YARN_MODE_STAGING_DIR: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.yarn.stagingDir")
.doc("Staging directory used while submitting kyuubi engine to YARN.")
yikf marked this conversation as resolved.
Show resolved Hide resolved
.version("1.9.0")
.stringConf
.createOptional

val ENGINE_DEPLOY_YARN_MODE_REPORT_INTERVAL: ConfigEntry[Long] =
buildConf("kyuubi.engine.yarn.report.interval")
.doc("Interval between reports of the current engine on yarn app status.")
.version("1.9.0")
.timeConf
.checkValue(t => t > 0, "must be positive integer")
.createWithDefault(Duration.ofSeconds(1).toMillis)

val ENGINE_DEPLOY_YARN_MODE_TAGS: OptionalConfigEntry[Seq[String]] =
buildConf("kyuubi.engine.yarn.tags")
.doc(s"kyuubi engine yarn tags when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
.version("1.9.0")
.stringConf
.toSequence()
.createOptional

val ENGINE_DEPLOY_YARN_MODE_QUEUE: ConfigEntry[String] =
buildConf("kyuubi.engine.yarn.queue")
.doc(s"kyuubi engine yarn queue when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
.version("1.9.0")
.stringConf
.createWithDefault("default")

val ENGINE_DEPLOY_YARN_MODE_MAX_APP_ATTEMPTS: OptionalConfigEntry[Int] =
buildConf("kyuubi.engine.yarn.maxAppAttempts")
yikf marked this conversation as resolved.
Show resolved Hide resolved
.doc(s"Maximum number of AM attempts before failing the app " +
s"when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
.version("1.9.0")
.intConf
.createOptional

val ENGINE_DEPLOY_YARN_MODE_MEMORY: ConfigEntry[Int] =
buildConf("kyuubi.engine.yarn.memory")
.doc(s"kyuubi engine container memory in mb when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
.version("1.9.0")
.intConf
.createWithDefault(1024)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use human-readable format? e.g. 1g, 1024m

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, plan support bytesConf in the follow up PR


val ENGINE_DEPLOY_YARN_MODE_CORES: ConfigEntry[Int] =
buildConf("kyuubi.engine.yarn.cores")
.doc(s"kyuubi engine container core number when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
.version("1.9.0")
.intConf
.createWithDefault(1)

val ENGINE_DEPLOY_YARN_MODE_JAVA_OPTIONS: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.yarn.java.options")
.doc(s"The extra Java options for the AM when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
.version("1.9.0")
.stringConf
.createOptional

val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.flink.memory")
.doc("The heap memory for the Flink SQL engine. Only effective in yarn session mode.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.deploy

import java.util.Locale

import org.apache.kyuubi.Logging

sealed trait DeployMode {
yikf marked this conversation as resolved.
Show resolved Hide resolved

/**
* String name of the engine deploy mode.
*/
def name: String
}

case object LocalMode extends DeployMode { val name = "local" }

case object YarnMode extends DeployMode { val name = "yarn" }

case object KubernetesMode extends DeployMode { val name = "kubernetes" }

object DeployMode extends Logging {

/**
* Returns the engine deploy mode from the given string.
*/
def fromString(mode: String): DeployMode = mode.toLowerCase(Locale.ROOT) match {
case LocalMode.name => LocalMode
case YarnMode.name => YarnMode
case KubernetesMode.name => KubernetesMode
case _ =>
warn(s"Unknown deploy mode: $mode, fallback to local mode.")
LocalMode
}
}
Loading
Loading