-
Notifications
You must be signed in to change notification settings - Fork 916
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KYUUBI #6368] Flink engine supports user impersonation #6383
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #6383 +/- ##
======================================
Coverage 0.00% 0.00%
======================================
Files 684 684
Lines 42281 42333 +52
Branches 5768 5772 +4
======================================
- Misses 42281 42333 +52 ☔ View full report in Codecov by Sentry. |
kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
Outdated
Show resolved
Hide resolved
@@ -68,12 +68,10 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with | |||
}.toMap | |||
|
|||
try { | |||
// Renewer is not needed. But setting a renewer can avoid potential NPE. | |||
val renewer = UserGroupInformation.getCurrentUser.getUserName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change is worth a dedicated PR.
@@ -58,14 +61,30 @@ class FlinkProcessBuilder( | |||
// flink.execution.target are required in Kyuubi conf currently | |||
val executionTarget: Option[String] = conf.getOption("flink.execution.target") | |||
|
|||
private lazy val proxyUserEnable: Boolean = { | |||
doAsEnabled && conf.get(ENGINE_FLINK_DOAS_ENABLED) && | |||
conf.getOption(s"$FLINK_CONF_PREFIX.$FLINK_SECURITY_KEYTAB_KEY").isEmpty && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in Spark, when doAs is enabled, we actually have a constraint to ensure that the principal should always be the session user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation of flink proxy user is different from the built-in --proxy-user
of spark. It relies on HADOOP_PROXY_USER
and has some limitations. It is difficult for us to control the behavior of the client well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should unify the concept on the Kyuubi layer as much as possible.
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
Outdated
Show resolved
Hide resolved
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
Outdated
Show resolved
Hide resolved
private def generateTokenFile(): Option[(String, String)] = { | ||
// We disabled `hadoopfs` token service, which may cause yarn client to miss hdfs token. | ||
// So we generate a hadoop token file to pass kyuubi engine tokens to submit process. | ||
// TODO: Removed this after FLINK-35525, delegation tokens will be passed by `kyuubi` provider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm hesitant to mix two approaches up, maybe we need a switch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FLINK-35525 has not yet been introduced in stable version. the hadoop token file way is currently only effective one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flink 1.20 is on the way.
We can simply make the decision based on the Flink version, but considering vendors are likely to backport patches to their internal distributions, an explicit switch is preferred.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an explicit switch is preferred.
Do you mean add a configuration to control this behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, a new configuration. Given the current approach is hacky and will be eventually removed, we can mark the configuration as internal.
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/kyuubi/engine/flink/security/token/KyuubiDelegationTokenProvider.java
Outdated
Show resolved
Hide resolved
|
||
<plugin> | ||
<groupId>net.alchim31.maven</groupId> | ||
<artifactId>scala-maven-plugin</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary
val ENGINE_FLINK_DOAS_ENABLED: ConfigEntry[Boolean] = | ||
buildConf("kyuubi.engine.flink.doAs.enabled") | ||
.doc("Whether to enable using hadoop proxy user to run flink engine. Only takes effect" + | ||
s" in kerberos environment and when `${ENGINE_DO_AS_ENABLED.key}` is set to `true`.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this configuration should be independent with kyuubi.engine.doAs.enabled
. And we should fail the Flink engine bootstrap when user enables the configuration on a non-Kerberized environment.
the docs might be: "When enabled, the session user is used as the proxy user to launch the Flink engine, otherwise, the server user. Note, due to the limitation of Apache Flink, it can only be enabled on Kerberized environment."
@@ -58,14 +61,30 @@ class FlinkProcessBuilder( | |||
// flink.execution.target are required in Kyuubi conf currently | |||
val executionTarget: Option[String] = conf.getOption("flink.execution.target") | |||
|
|||
private lazy val proxyUserEnable: Boolean = { | |||
doAsEnabled && conf.get(ENGINE_FLINK_DOAS_ENABLED) && | |||
conf.getOption(s"$FLINK_CONF_PREFIX.$FLINK_SECURITY_KEYTAB_KEY").isEmpty && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should unify the concept on the Kyuubi layer as much as possible.
private def generateTokenFile(): Option[(String, String)] = { | ||
// We disabled `hadoopfs` token service, which may cause yarn client to miss hdfs token. | ||
// So we generate a hadoop token file to pass kyuubi engine tokens to submit process. | ||
// TODO: Removed this after FLINK-35525, delegation tokens will be passed by `kyuubi` provider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, a new configuration. Given the current approach is hacky and will be eventually removed, we can mark the configuration as internal.
Hi @wForget , as we are about to cut the branch for 1.10.0, would you like to polish this PR or postpone it to next milestone? |
Yes, I hope to bring it into 1.10.0, I will complete it as soon as possible. |
Is this a blocker issue with KyuubiDelegationTokenProvider ? |
No, we can enable |
# 🔍 Description ## Issue References 🔗 Allow delegation tokens to be used and renewed by yarn resourcemanager. (used in proxy user mode of flink engine, address #6383 (comment)) ## Describe Your Solution 🔧 Set hadoop fs delegation token renewer to empty. ## Types of changes 🔖 - [X] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests --- # Checklist 📝 - [X] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6753 from wForget/renewer. Closes #6753 f2e1f0a [wforget] Set hadoop fs delegation token renewer to empty Authored-by: wforget <643348094@qq.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
" it can only be enabled on Kerberized environment.") | ||
.version("1.10.0") | ||
.booleanConf | ||
.createWithDefault(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we let kyuubi.engine.flink.doAs.enabled
default value fallback to kyuubi.engine.doAs.enabled
, and make flink engine exclusively respect kyuubi.engine.flink.doAs.enabled
?
ignore this.
val ENGINE_FLINK_DOAS_GENERATE_TOKEN_FILE: ConfigEntry[Boolean] = | ||
buildConf("kyuubi.engine.flink.doAs.generateTokenFile") | ||
.doc("Whether to generate a hadoop token file for flink submit process." + | ||
s" We need to enable it when we set `$ENGINE_FLINK_DOAS_ENABLED=true`" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s" We need to enable it when we set `$ENGINE_FLINK_DOAS_ENABLED=true`" + | |
s" We need to enable it when we set `${ENGINE_FLINK_DOAS_ENABLED.key}=true`" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be an internal flag -we can remove it anytime
I think the issue could also be addressed by YARN-10333 (correct me if I'm wrong), so the docs might be
"When ${ENGINE_FLINK_DOAS_ENABLED.key}=true
and neither FLINK-35525 (Flink 1.20.0) nor YARN-10333 (Hadoop 3.4.0) is available, enable this configuration to generate a temporary HADOOP_TOKEN_FILE that will be picked up by the Flink engine bootstrap process."
private lazy val proxyUserEnable: Boolean = { | ||
doAsEnabled && conf.get(ENGINE_FLINK_DOAS_ENABLED) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... it can only be enabled on Kerberized environment.
I think the logic here should be
if (conf.get(ENGINE_FLINK_DOAS_ENABLED)) {
if (!UserGroupInformation.isSecurityEnabled) {
log warning message
false
} else {
true
}
}
pom.xml
Outdated
@@ -84,6 +84,7 @@ | |||
<module>kyuubi-util</module> | |||
<module>kyuubi-util-scala</module> | |||
<module>kyuubi-zookeeper</module> | |||
<module>extensions/flink/kyuubi-flink-token-provider</module> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move it to line 57
the code seems does not mention YARN-10333? |
TODO things:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, also cc @link3280
Thanks for the hard work and detailed review. Merged to master (1.10.0). |
🔍 Description
Issue References 🔗
This pull request fixes #6368
Describe Your Solution 🔧
Support impersonation mode for flink sql engine.
Types of changes 🔖
Test Plan 🧪
Behavior Without This Pull Request ⚰️
Behavior With This Pull Request 🎉
Test in hadoop-testing env.
Connection:
sql:
result:
launch engine command:
launch engine log:
jobmanager job:
taskmanager log:
Related Unit Tests
Checklist 📝
Be nice. Be informative.