Skip to content

Commit

Permalink
conf option
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenliang123 committed Nov 27, 2023
1 parent c8f4e9c commit 5bb8f48
Show file tree
Hide file tree
Showing 17 changed files with 138 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, KyuubiFunSuite, SCALA
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
import org.apache.kyuubi.util.command.CommandUtils._
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
import org.apache.kyuubi.zookeeper.ZookeeperConf.{ZK_CLIENT_PORT, ZK_CLIENT_PORT_ADDRESS}

Expand Down Expand Up @@ -166,10 +167,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources
command += classpathEntries.asScala.mkString(File.pathSeparator)
command += "org.apache.kyuubi.engine.flink.FlinkSQLEngine"

conf.getAll.foreach { case (k, v) =>
command += "--conf"
command += s"$k=$v"
}
command ++= confKeyValues(conf.getAll)

processBuilder.command(command.toList.asJava)
processBuilder.redirectOutput(Redirect.INHERIT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, SCALA_COMPILE_VERSION,
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS, KYUUBI_HOME}
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
import org.apache.kyuubi.util.command.CommandUtils._
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
import org.apache.kyuubi.zookeeper.ZookeeperConf.{ZK_CLIENT_PORT, ZK_CLIENT_PORT_ADDRESS}

Expand Down Expand Up @@ -179,10 +180,7 @@ trait WithFlinkSQLEngineOnYarn extends KyuubiFunSuite with WithFlinkTestResource
conf.set(k, v)
}

for ((k, v) <- conf.getAll) {
command += "--conf"
command += s"$k=$v"
}
command ++= confKeyValues(conf.getAll)

processBuilder.command(command.toList.asJava)
processBuilder.redirectOutput(Redirect.INHERIT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.util.command.CommandUtils._

class HiveCatalogDatabaseOperationSuite extends HiveJDBCTestHelper {

override def beforeAll(): Unit = {
val metastore = Utils.createTempDir(prefix = getClass.getSimpleName)
metastore.toFile.delete()
val args = Array(
"--conf",
CONF,
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true",
"--conf",
CONF,
s"${ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key}=true")
HiveSQLEngine.main(args)
super.beforeAll()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.kyuubi.{HiveEngineTests, KYUUBI_VERSION, Utils}
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
import org.apache.kyuubi.util.command.CommandUtils._

class HiveOperationSuite extends HiveEngineTests {

override def beforeAll(): Unit = {
val metastore = Utils.createTempDir(prefix = getClass.getSimpleName)
metastore.toFile.delete()
val args = Array(
"--conf",
CONF,
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true")
HiveSQLEngine.main(args)
super.beforeAll()
Expand Down
7 changes: 4 additions & 3 deletions kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.hadoop.util.ShutdownHookManager

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.internal.Tests.IS_TESTING
import org.apache.kyuubi.util.command.CommandUtils._

object Utils extends Logging {

Expand Down Expand Up @@ -325,7 +326,7 @@ object Utils extends Logging {
require(args.length % 2 == 0, s"Illegal size of arguments.")
for (i <- args.indices by 2) {
require(
args(i) == "--conf",
args(i) == CONF,
s"Unrecognized main arguments prefix ${args(i)}," +
s"the argument format is '--conf k=v'.")

Expand All @@ -347,9 +348,9 @@ object Utils extends Logging {
case PATTERN_FOR_KEY_VALUE_ARG(key, value) if nextKV =>
val (_, newValue) = redact(redactionPattern, Seq((key, value))).head
nextKV = false
s"$key=$newValue"
getKeyValuePair(key, newValue)

case cmd if cmd == "--conf" =>
case cmd if cmd == CONF =>
nextKV = true
cmd

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.SERVER_SECRET_REDACTION_PATTERN
import org.apache.kyuubi.util.command.CommandUtils._

class UtilsSuite extends KyuubiFunSuite {

Expand Down Expand Up @@ -158,13 +159,13 @@ class UtilsSuite extends KyuubiFunSuite {

val buffer = new ArrayBuffer[String]()
buffer += "main"
buffer += "--conf"
buffer += CONF
buffer += "kyuubi.my.password=sensitive_value"
buffer += "--conf"
buffer += CONF
buffer += "kyuubi.regular.property1=regular_value"
buffer += "--conf"
buffer += CONF
buffer += "kyuubi.my.secret=sensitive_value"
buffer += "--conf"
buffer += CONF
buffer += "kyuubi.regular.property2=regular_value"

val commands = buffer.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ProcBuilder
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.command.CommandUtils._

class ChatProcessBuilder(
override val proxyUser: String,
Expand Down Expand Up @@ -91,13 +92,10 @@ class ChatProcessBuilder(
buffer += classpathEntries.asScala.mkString(File.pathSeparator)
buffer += mainClass

buffer += "--conf"
buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser)

buffer ++= confKeyValues(conf.getAll)

conf.getAll.foreach { case (k, v) =>
buffer += "--conf"
buffer += s"$k=$v"
}
buffer.toArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder}
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.command.CommandUtils._

/**
* A builder to build flink sql engine progress.
Expand Down Expand Up @@ -134,14 +135,9 @@ class FlinkProcessBuilder(
buffer += s"$mainClass"
buffer += s"${mainResource.get}"

buffer += "--conf"
buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
conf.getAll.foreach { case (k, v) =>
if (k.startsWith("kyuubi.")) {
buffer += "--conf"
buffer += s"$k=$v"
}
}
buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser)

buffer ++= confKeyValues(conf.getAll.filter(_._1.startsWith("kyuubi.")))

buffer.toArray

Expand Down Expand Up @@ -204,13 +200,10 @@ class FlinkProcessBuilder(
buffer += classpathEntries.asScala.mkString(File.pathSeparator)
buffer += mainClass

buffer += "--conf"
buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser)

buffer ++= confKeyValues(conf.getAll)

conf.getAll.foreach { case (k, v) =>
buffer += "--conf"
buffer += s"$k=$v"
}
buffer.toArray
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID, KYUUBI_SES
import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
import org.apache.kyuubi.engine.hive.HiveProcessBuilder._
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.command.CommandUtils._

class HiveProcessBuilder(
override val proxyUser: String,
Expand Down Expand Up @@ -104,14 +105,11 @@ class HiveProcessBuilder(
buffer += classpathEntries.asScala.mkString(File.pathSeparator)
buffer += mainClass

buffer += "--conf"
buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
buffer += "--conf"
buffer += s"$KYUUBI_ENGINE_ID=$engineRefId"
buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser)
buffer ++= confKeyValue(KYUUBI_ENGINE_ID, engineRefId)

for ((k, v) <- conf.getAll) {
buffer += "--conf"
buffer += s"$k=$v"
buffer ++= confKeyValue(k, v)
}
buffer.toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_PASSWORD, ENG
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ProcBuilder
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.command.CommandUtils._

class JdbcProcessBuilder(
override val proxyUser: String,
Expand Down Expand Up @@ -94,13 +95,10 @@ class JdbcProcessBuilder(
buffer += classpathEntries.asScala.mkString(File.pathSeparator)
buffer += mainClass

buffer += "--conf"
buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser)

buffer ++= confKeyValues(conf.getAll)

for ((k, v) <- conf.getAll) {
buffer += "--conf"
buffer += s"$k=$v"
}
buffer.toArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.command.CommandUtils._

class SparkBatchProcessBuilder(
override val proxyUser: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.{KubernetesUtils, Validator}
import org.apache.kyuubi.util.command.CommandUtils._

class SparkProcessBuilder(
override val proxyUser: String,
Expand Down Expand Up @@ -141,8 +142,7 @@ class SparkProcessBuilder(
}
// pass spark engine log path to spark conf
(allConf ++ engineLogPathConf ++ appendPodNameConf(allConf)).foreach { case (k, v) =>
buffer += CONF
buffer += s"${convertConfigKey(k)}=$v"
buffer ++= confKeyValue(convertConfigKey(k), v)
}

setupKerberos(buffer)
Expand Down Expand Up @@ -289,10 +289,8 @@ class SparkProcessBuilder(
def setSparkUserName(userName: String, buffer: ArrayBuffer[String]): Unit = {
clusterManager().foreach { cm =>
if (cm.toUpperCase.startsWith("K8S")) {
buffer += CONF
buffer += s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$userName"
buffer += CONF
buffer += s"spark.executorEnv.SPARK_USER_NAME=$userName"
buffer ++= confKeyValue("spark.kubernetes.driverEnv.SPARK_USER_NAME", userName)
buffer ++= confKeyValue("spark.executorEnv.SPARK_USER_NAME", userName)
}
}
}
Expand Down Expand Up @@ -335,7 +333,6 @@ object SparkProcessBuilder {
"spark.kubernetes.kerberos.krb5.path",
"spark.kubernetes.file.upload.path")

final private[spark] val CONF = "--conf"
final private[spark] val CLASS = "--class"
final private[spark] val PROXY_USER = "--proxy-user"
final private[spark] val SPARK_FILES = "spark.files"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.command.CommandUtils._

class TrinoProcessBuilder(
override val proxyUser: String,
Expand Down Expand Up @@ -97,13 +98,10 @@ class TrinoProcessBuilder(
// user.name
// kyuubi.session.user
// or just leave it, because we can handle it at operation layer
buffer += "--conf"
buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser)

buffer ++= confKeyValues(conf.getAll)

for ((k, v) <- conf.getAll) {
buffer += "--conf"
buffer += s"$k=$v"
}
buffer.toArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS, ENGINE_FLINK_EXTRA_CLASSPATH, ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
import org.apache.kyuubi.util.command.CommandUtils._

class FlinkProcessBuilderSuite extends KyuubiFunSuite {
private def sessionModeConf = KyuubiConf()
Expand Down Expand Up @@ -73,7 +74,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
private def confStr: String = {
sessionModeConf.clone.getAll
.filter(!_._1.equals(KYUUBI_ENGINE_CREDENTIALS_KEY))
.map { case (k, v) => s"\\\\\\n\\t--conf $k=$v" }
.map { case (k, v) => s"\\\\\\n\\t${confKeyValueStr(k, v)}" }
.mkString(" ")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.service.ServiceUtils
import org.apache.kyuubi.util.AssertionUtils._
import org.apache.kyuubi.util.command.CommandUtils._

class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
private def conf = KyuubiConf().set("kyuubi.on", "off")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.util.command

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

object CommandUtils {
val CONF = "--conf"

/**
* Assemble key value pair with "=" seperator
*
* @param key
* @param value
* @return
*/
def getKeyValuePair(key: String, value: String): String = s"$key=$value".trim

/**
* Assemble key value pair with config option prefix
*
* @param key
* @param value
* @return
*/
def confKeyValue(key: String, value: String): Traversable[String] =
Stream(CONF, getKeyValuePair(key, value))

def confKeyValueStr(key: String, value: String): String =
s"$CONF ${getKeyValuePair(key, value)}"

def confKeyValues(configs: Iterable[(String, String)]): Traversable[String] =
configs.flatMap { case (k, v) => confKeyValue(k, v) }.toStream
}
Loading

0 comments on commit 5bb8f48

Please sign in to comment.