diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala index 77b9cd6d3b1..f43adfbc216 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala @@ -104,6 +104,16 @@ class FlinkProcessBuilder( val userJars = conf.get(ENGINE_FLINK_APPLICATION_JARS) userJars.foreach(jars => flinkExtraJars ++= jars.split(",")) + val hiveConfDirOpt = env.get("HIVE_CONF_DIR") + hiveConfDirOpt.foreach { hiveConfDir => + val hiveConfFile = Paths.get(hiveConfDir).resolve("hive-site.xml") + if (!Files.exists(hiveConfFile)) { + throw new KyuubiException(s"The file $hiveConfFile does not exists. " + + s"Please put hive-site.xml when HIVE_CONF_DIR env $hiveConfDir is configured.") + } + flinkExtraJars += s"$hiveConfFile" + } + buffer += "-t" buffer += "yarn-application" buffer += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}" @@ -111,6 +121,10 @@ class FlinkProcessBuilder( buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}" buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=." + hiveConfDirOpt.foreach { _ => + buffer += "-Dcontainerized.master.env.HIVE_CONF_DIR=." + } + val customFlinkConf = conf.getAllWithPrefix("flink", "") customFlinkConf.filter(_._1 != "app.name").foreach { case (k, v) => buffer += s"-D$k=$v" @@ -166,6 +180,7 @@ class FlinkProcessBuilder( env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add) env.get("YARN_CONF_DIR").foreach(classpathEntries.add) env.get("HBASE_CONF_DIR").foreach(classpathEntries.add) + env.get("HIVE_CONF_DIR").foreach(classpathEntries.add) val hadoopCp = env.get(FLINK_HADOOP_CLASSPATH_KEY) hadoopCp.foreach(classpathEntries.add) val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala index 990d56f15c4..26e355a87bd 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala @@ -56,6 +56,9 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString, "usrlib")).toFile private val tempUdfJar = Files.createFile(Paths.get(tempUsrLib.toPath.toString, "test-udf.jar")) + private val tempHiveDir = + Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString, "hive-conf")).toFile + Files.createFile(Paths.get(tempHiveDir.toPath.toString, "hive-site.xml")) private def envDefault: ListMap[String, String] = ListMap( "JAVA_HOME" -> s"${File.separator}jdk", @@ -63,7 +66,8 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { private def envWithoutHadoopCLASSPATH: ListMap[String, String] = envDefault + ("HADOOP_CONF_DIR" -> s"${File.separator}hadoop${File.separator}conf") + ("YARN_CONF_DIR" -> s"${File.separator}yarn${File.separator}conf") + - ("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf") + ("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf") + + ("HIVE_CONF_DIR" -> s"$tempHiveDir") private def envWithAllHadoop: ListMap[String, String] = envWithoutHadoopCLASSPATH + (FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop") private def confStr: String = { @@ -89,10 +93,12 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { val expectedCommands = escapePaths(s"${builder.flinkExecutable} run-application ") + s"-t yarn-application " + - s"-Dyarn.ship-files=.*\\/flink-sql-client.*jar;.*\\/flink-sql-gateway.*jar;$tempUdfJar " + + s"-Dyarn.ship-files=.*\\/flink-sql-client.*jar;.*\\/flink-sql-gateway.*jar;$tempUdfJar" + + s";.*\\/hive-site\\.xml " + s"-Dyarn\\.application\\.name=kyuubi_.* " + s"-Dyarn\\.tags=KYUUBI " + s"-Dcontainerized\\.master\\.env\\.FLINK_CONF_DIR=\\. " + + s"-Dcontainerized\\.master\\.env\\.HIVE_CONF_DIR=\\. " + s"-Dexecution.target=yarn-application " + s"-c org\\.apache\\.kyuubi\\.engine\\.flink\\.FlinkSQLEngine " + s".*kyuubi-flink-sql-engine_.*jar" + @@ -151,9 +157,9 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { matchActualAndExpectedSessionMode(builder) } - test("application mode - default env") { + test("application mode - all hadoop related environment variables are configured") { val builder = new FlinkProcessBuilder("paullam", applicationModeConf) { - override def env: Map[String, String] = envDefault + override def env: Map[String, String] = envWithAllHadoop } matchActualAndExpectedApplicationMode(builder) }