From 66f94ce5d7f2b54482eeacd1a9c69581eb1851b0 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Fri, 10 May 2024 14:22:29 +0800 Subject: [PATCH] [KYUUBI #6368] Support impersonation mode for flink sql engine --- docs/configuration/settings.md | 1 + .../org/apache/kyuubi/config/KyuubiConf.scala | 8 +++++ .../engine/flink/FlinkProcessBuilder.scala | 35 ++++++++++++++++--- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index c3231cbb0b8..92b1ec6061e 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -149,6 +149,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.engine.flink.initialize.sql | SHOW DATABASES | The initialize sql for Flink engine. It fallback to `kyuubi.engine.initialize.sql`. | seq | 1.8.1 | | kyuubi.engine.flink.java.options | <undefined> | The extra Java options for the Flink SQL engine. Only effective in yarn session mode. | string | 1.6.0 | | kyuubi.engine.flink.memory | 1g | The heap memory for the Flink SQL engine. Only effective in yarn session mode. | string | 1.6.0 | +| kyuubi.engine.flink.proxy.user.enabled | false | Whether to enable using hadoop proxy user to run flink engine. Only takes effect in kerberos environment and when `kyuubi.engine.doAs.enabled` is set to `true`. | boolean | 1.10.0 | | kyuubi.engine.hive.deploy.mode | LOCAL | Configures the hive engine deploy mode, The value can be 'local', 'yarn'. In local mode, the engine operates on the same node as the KyuubiServer. In YARN mode, the engine runs within the Application Master (AM) container of YARN. | string | 1.9.0 | | kyuubi.engine.hive.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go. | seq | 1.7.0 | | kyuubi.engine.hive.extra.classpath | <undefined> | The extra classpath for the Hive query engine, for configuring location of the hadoop client jars and etc. | string | 1.6.0 | diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index d8fec583391..e15e13bb004 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -2932,6 +2932,14 @@ object KyuubiConf { .version("1.8.1") .fallbackConf(ENGINE_INITIALIZE_SQL) + val ENGINE_FLINK_PROXY_USER_ENABLED: ConfigEntry[Boolean] = + buildConf("kyuubi.engine.flink.proxy.user.enabled") + .doc("Whether to enable using hadoop proxy user to run flink engine. Only takes effect" + + s" in kerberos environment and when `${ENGINE_DO_AS_ENABLED.key}` is set to `true`.") + .version("1.10.0") + .booleanConf + .createWithDefault(false) + val SERVER_LIMIT_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] = buildConf("kyuubi.server.limit.connections.per.user") .doc("Maximum kyuubi server connections per user." + diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala index 4ae714deefa..0d09edc66df 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala @@ -58,14 +58,31 @@ class FlinkProcessBuilder( // flink.execution.target are required in Kyuubi conf currently val executionTarget: Option[String] = conf.getOption("flink.execution.target") + private lazy val proxyUserEnable: Boolean = { + doAsEnabled && conf.get(ENGINE_FLINK_PROXY_USER_ENABLED) && + conf.getOption(s"flink.$FLINK_SECURITY_KEYTAB_KEY").isEmpty && + conf.getOption(s"flink.$FLINK_SECURITY_PRINCIPAL_KEY").isEmpty && + !conf.getOption(s"flink.$FLINK_SECURITY_DELEGATION_TOKENS_ENABLED_KEY").exists(_.toBoolean) + } + override protected def module: String = "kyuubi-flink-sql-engine" override protected def mainClass: String = "org.apache.kyuubi.engine.flink.FlinkSQLEngine" - override def env: Map[String, String] = conf.getEnvs + - ("FLINK_CONF_DIR" -> conf.getEnvs.getOrElse( - "FLINK_CONF_DIR", - s"$flinkHome${File.separator}conf")) + override def env: Map[String, String] = { + val flinkExtraEnvs = if (proxyUserEnable) { + Map( + "FLINK_CONF_DIR" -> conf.getEnvs.getOrElse( + "FLINK_CONF_DIR", + s"$flinkHome${File.separator}conf"), + FLINK_PROXY_USER_KEY -> proxyUser) + } else { + Map("FLINK_CONF_DIR" -> conf.getEnvs.getOrElse( + "FLINK_CONF_DIR", + s"$flinkHome${File.separator}conf")) + } + conf.getEnvs ++ flinkExtraEnvs + } override def clusterManager(): Option[String] = { executionTarget match { @@ -121,6 +138,13 @@ class FlinkProcessBuilder( buffer += s"-Dyarn.application.name=${conf.getOption(APP_KEY).get}" buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}" buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=." + if (proxyUserEnable && conf.getOption( + s"flink.$FLINK_SECURITY_DELEGATION_TOKENS_ENABLED_KEY").isEmpty) { + // FLINK-31109: Flink only supports hadoop proxy user only when delegation tokens fetch + // is managed outside. So we need to disable delegation tokens of flink and rely on kyuubi + // server to maintain engine's tokens. + buffer += s"-D$FLINK_SECURITY_DELEGATION_TOKENS_ENABLED_KEY=false" + } hiveConfDirOpt.foreach { _ => buffer += "-Dcontainerized.master.env.HIVE_CONF_DIR=." @@ -217,4 +241,7 @@ object FlinkProcessBuilder { final val YARN_TAG_KEY = "yarn.tags" final val FLINK_HADOOP_CLASSPATH_KEY = "FLINK_HADOOP_CLASSPATH" final val FLINK_PROXY_USER_KEY = "HADOOP_PROXY_USER" + final val FLINK_SECURITY_KEYTAB_KEY = "security.kerberos.login.keytab" + final val FLINK_SECURITY_PRINCIPAL_KEY = "security.kerberos.login.principal" + final val FLINK_SECURITY_DELEGATION_TOKENS_ENABLED_KEY = "security.delegation.tokens.enabled" }