-
Notifications
You must be signed in to change notification settings - Fork 914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KYUUBI #5867] HiveEngine support run on YARN mode #5868
Changes from 2 commits
98ff19c
2ed1d44
1b0b77f
1fa18ba
d89d09c
d1eb5ae
5e5045e
34a67b4
6b97c42
5474ebf
3c17d2c
44f7287
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kyuubi.engine.hive.deploy | ||
|
||
import org.apache.kyuubi.Utils | ||
import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter | ||
import org.apache.kyuubi.engine.hive.HiveSQLEngine | ||
import org.apache.kyuubi.util.KyuubiHadoopUtils | ||
|
||
object HiveYarnModeSubmitter extends EngineYarnModeSubmitter { | ||
|
||
def main(args: Array[String]): Unit = { | ||
Utils.fromCommandLineArgs(args, kyuubiConf) | ||
// Initialize the engine submitter. | ||
init() | ||
// Submit the engine application to YARN. | ||
submitApplication() | ||
} | ||
|
||
private def init(): Unit = { | ||
yarnConf = KyuubiHadoopUtils.newYarnConfiguration(kyuubiConf) | ||
hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf) | ||
} | ||
|
||
override var engineType: String = "hive" | ||
|
||
override def engineMainClass(): String = HiveSQLEngine.getClass.getName | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kyuubi.it.hive.operation | ||
|
||
import org.apache.kyuubi.{HiveEngineTests, Utils, WithKyuubiServerAndHadoopMiniCluster} | ||
import org.apache.kyuubi.config.KyuubiConf | ||
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_IDLE_TIMEOUT, ENGINE_TYPE, KYUUBI_ENGINE_ENV_PREFIX, KYUUBI_HOME} | ||
import org.apache.kyuubi.engine.deploy.YarnMode | ||
|
||
class KyuubiOperationHiveEngineYarnModeSuite extends HiveEngineTests | ||
with WithKyuubiServerAndHadoopMiniCluster { | ||
|
||
override protected val conf: KyuubiConf = { | ||
val metastore = Utils.createTempDir(prefix = getClass.getSimpleName) | ||
metastore.toFile.delete() | ||
KyuubiConf() | ||
.set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome) | ||
.set(ENGINE_TYPE, "HIVE_SQL") | ||
.set(KyuubiConf.ENGINE_DEPLOY_MODE, YarnMode.name) | ||
// increase this to 30s as hive session state and metastore client is slow initializing | ||
.setIfMissing(ENGINE_IDLE_TIMEOUT, 30000L) | ||
.set("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastore;create=true") | ||
} | ||
|
||
override def beforeAll(): Unit = { | ||
super.beforeAll() | ||
conf | ||
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_MEMORY, Math.min(getYarnMaximumAllocationMb, 1024)) | ||
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_MAX_APP_ATTEMPTS, 1) | ||
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_CORES, 1) | ||
} | ||
|
||
override protected def jdbcUrl: String = getJdbcUrl | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ import scala.util.matching.Regex | |
import org.apache.kyuubi.{Logging, Utils} | ||
import org.apache.kyuubi.config.KyuubiConf._ | ||
import org.apache.kyuubi.engine.{EngineType, ShareLevel} | ||
import org.apache.kyuubi.engine.deploy.LocalMode | ||
import org.apache.kyuubi.operation.{NoneMode, PlainStyle} | ||
import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP} | ||
|
||
|
@@ -231,6 +232,7 @@ object KyuubiConf { | |
final val KYUUBI_CONF_FILE_NAME = "kyuubi-defaults.conf" | ||
final val KYUUBI_HOME = "KYUUBI_HOME" | ||
final val KYUUBI_ENGINE_ENV_PREFIX = "kyuubi.engineEnv" | ||
final val KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX = "kyuubi.engine.yarn.AMEnv" | ||
final val KYUUBI_BATCH_CONF_PREFIX = "kyuubi.batchConf" | ||
final val KYUUBI_KUBERNETES_CONF_PREFIX = "kyuubi.kubernetes" | ||
final val USER_DEFAULTS_CONF_QUOTE = "___" | ||
|
@@ -2693,6 +2695,79 @@ object KyuubiConf { | |
.stringConf | ||
.createOptional | ||
|
||
val ENGINE_DEPLOY_MODE: ConfigEntry[String] = | ||
buildConf("kyuubi.engine.deploy.mode") | ||
.doc("Configures the engine deploy mode, The value can be 'local', 'yarn'. " + | ||
"In local mode, the engine operates on the same node as the KyuubiServer. " + | ||
"In YARN mode, the engine runs within the Application Master (AM) container of YARN. " + | ||
yikf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"Currently, only Hive engine supports YARN mode.") | ||
yikf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.version("1.9.0") | ||
.stringConf | ||
.transformToUpperCase | ||
.checkValue( | ||
mode => Set("LOCAL", "YARN").contains(mode), | ||
"Invalid value for 'kyuubi.engine.hive.deploy.mode'. Valid values are 'local', 'yarn'.") | ||
.createWithDefault(LocalMode.name) | ||
|
||
val ENGINE_DEPLOY_YARN_MODE_STAGING_DIR: OptionalConfigEntry[String] = | ||
buildConf("kyuubi.engine.yarn.stagingDir") | ||
.doc("Staging directory used while submitting kyuubi engine to YARN.") | ||
yikf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.version("1.9.0") | ||
.stringConf | ||
.createOptional | ||
|
||
val ENGINE_DEPLOY_YARN_MODE_REPORT_INTERVAL: ConfigEntry[Long] = | ||
buildConf("kyuubi.engine.yarn.report.interval") | ||
.doc("Interval between reports of the current engine on yarn app status.") | ||
.version("1.9.0") | ||
.timeConf | ||
.checkValue(t => t > 0, "must be positive integer") | ||
.createWithDefault(Duration.ofSeconds(1).toMillis) | ||
|
||
val ENGINE_DEPLOY_YARN_MODE_TAGS: OptionalConfigEntry[Seq[String]] = | ||
buildConf("kyuubi.engine.yarn.tags") | ||
.doc(s"kyuubi engine yarn tags when `${ENGINE_DEPLOY_MODE.key}` is YARN.") | ||
.version("1.9.0") | ||
.stringConf | ||
.toSequence() | ||
.createOptional | ||
|
||
val ENGINE_DEPLOY_YARN_MODE_QUEUE: ConfigEntry[String] = | ||
buildConf("kyuubi.engine.yarn.queue") | ||
.doc(s"kyuubi engine yarn queue when `${ENGINE_DEPLOY_MODE.key}` is YARN.") | ||
.version("1.9.0") | ||
.stringConf | ||
.createWithDefault("default") | ||
|
||
val ENGINE_DEPLOY_YARN_MODE_MAX_APP_ATTEMPTS: OptionalConfigEntry[Int] = | ||
buildConf("kyuubi.engine.yarn.maxAppAttempts") | ||
yikf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.doc(s"Maximum number of AM attempts before failing the app " + | ||
s"when `${ENGINE_DEPLOY_MODE.key}` is YARN.") | ||
.version("1.9.0") | ||
.intConf | ||
.createOptional | ||
|
||
val ENGINE_DEPLOY_YARN_MODE_MEMORY: ConfigEntry[Int] = | ||
buildConf("kyuubi.engine.yarn.memory") | ||
.doc(s"kyuubi engine container memory in mb when `${ENGINE_DEPLOY_MODE.key}` is YARN.") | ||
.version("1.9.0") | ||
.intConf | ||
.createWithDefault(1024) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use human-readable format? e.g. 1g, 1024m There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, plan support bytesConf in the follow up PR |
||
|
||
val ENGINE_DEPLOY_YARN_MODE_CORES: ConfigEntry[Int] = | ||
buildConf("kyuubi.engine.yarn.cores") | ||
.doc(s"kyuubi engine container core number when `${ENGINE_DEPLOY_MODE.key}` is YARN.") | ||
.version("1.9.0") | ||
.intConf | ||
.createWithDefault(1) | ||
|
||
val ENGINE_DEPLOY_YARN_MODE_JAVA_OPTIONS: OptionalConfigEntry[String] = | ||
buildConf("kyuubi.engine.yarn.java.options") | ||
.doc(s"The extra Java options for the AM when `${ENGINE_DEPLOY_MODE.key}` is YARN.") | ||
.version("1.9.0") | ||
.stringConf | ||
.createOptional | ||
|
||
val ENGINE_FLINK_MEMORY: ConfigEntry[String] = | ||
buildConf("kyuubi.engine.flink.memory") | ||
.doc("The heap memory for the Flink SQL engine. Only effective in yarn session mode.") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kyuubi.engine.deploy | ||
|
||
import java.util.Locale | ||
|
||
import org.apache.kyuubi.Logging | ||
|
||
sealed trait DeployMode { | ||
yikf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/** | ||
* String name of the engine deploy mode. | ||
*/ | ||
def name: String | ||
} | ||
|
||
case object LocalMode extends DeployMode { val name = "local" } | ||
|
||
case object YarnMode extends DeployMode { val name = "yarn" } | ||
|
||
case object KubernetesMode extends DeployMode { val name = "kubernetes" } | ||
|
||
object DeployMode extends Logging { | ||
|
||
/** | ||
* Returns the engine deploy mode from the given string. | ||
*/ | ||
def fromString(mode: String): DeployMode = mode.toLowerCase(Locale.ROOT) match { | ||
case LocalMode.name => LocalMode | ||
case YarnMode.name => YarnMode | ||
case KubernetesMode.name => KubernetesMode | ||
case _ => | ||
warn(s"Unknown deploy mode: $mode, fallback to local mode.") | ||
LocalMode | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe
selfExited
?