Skip to content

Commit d4dd1cb

Browse files
zml1206yaooqinn
authored andcommitted
[KYUUBI #5707][AUTHZ] Support merge into path-based table for Delta Lake in Authz
# 🔍 Description ## Issue References 🔗 This pull request fixes #5707 ## Describe Your Solution 🔧 `org.apache.spark.sql.delta.commands.MergeIntoCommand` add uriDescs. ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests org.apache.kyuubi.plugin.spark.authz.ranger.DeltaCatalogRangerSparkExtensionSuite.test("merge into path-based table") --- # Checklists ## 📝 Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## 📝 Committer Pre-Merge Checklist - [x] Pull request title is okay. - [x] No license issues. - [x] Milestone correctly set? - [x] Test coverage is ok - [x] Assignees are selected. - [x] Minimum number of approvals - [x] No changes are requested **Be nice. Be informative.** Closes #5708 from zml1206/KYUUBI-5707. Closes #5707 45ab4d4 [zml1206] fix 679f735 [zml1206] Support merge into path-based table for Delta Lake in Authz Authored-by: zml1206 <zhuml1206@gmail.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent 71b0376 commit d4dd1cb

File tree

5 files changed

+64
-13
lines changed

5 files changed

+64
-13
lines changed

extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2198,7 +2198,11 @@
21982198
"fieldName" : "source",
21992199
"fieldExtractor" : "LogicalPlanQueryExtractor"
22002200
} ],
2201-
"uriDescs" : [ ]
2201+
"uriDescs" : [ {
2202+
"fieldName" : "target",
2203+
"fieldExtractor" : "SubqueryAliasURIExtractor",
2204+
"isInput" : false
2205+
} ]
22022206
}, {
22032207
"classname" : "org.apache.spark.sql.delta.commands.OptimizeTableCommand",
22042208
"tableDescs" : [ {

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,12 @@ class ResolvedIdentifierTableExtractor extends TableExtractor {
257257
class SubqueryAliasTableExtractor extends TableExtractor {
258258
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
259259
v1.asInstanceOf[SubqueryAlias] match {
260-
case SubqueryAlias(_, SubqueryAlias(identifier, _))
261-
if !isPathIdentifier(identifier.name, spark) =>
262-
lookupExtractor[StringTableExtractor].apply(spark, identifier.toString())
260+
case SubqueryAlias(_, SubqueryAlias(identifier, _)) =>
261+
if (isPathIdentifier(identifier.name, spark)) {
262+
None
263+
} else {
264+
lookupExtractor[StringTableExtractor].apply(spark, identifier.toString())
265+
}
263266
case SubqueryAlias(identifier, _) if !isPathIdentifier(identifier.name, spark) =>
264267
lookupExtractor[StringTableExtractor].apply(spark, identifier.toString())
265268
case _ => None

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,15 @@ class IdentifierURIExtractor extends URIExtractor {
109109

110110
class SubqueryAliasURIExtractor extends URIExtractor {
111111
override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = v1 match {
112-
case SubqueryAlias(_, SubqueryAlias(identifier, _))
113-
if isPathIdentifier(identifier.name, spark) =>
114-
Seq(identifier.name).map(Uri)
112+
case SubqueryAlias(_, SubqueryAlias(identifier, _)) =>
113+
if (isPathIdentifier(identifier.name, spark)) {
114+
Seq(identifier.name).map(Uri)
115+
} else {
116+
Nil
117+
}
115118
case SubqueryAlias(identifier, _) if isPathIdentifier(identifier.name, spark) =>
116119
Seq(identifier.name).map(Uri)
120+
case _ => Nil
117121
}
118122
}
119123

extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,8 @@ object DeltaCommands extends CommandSpecs[TableCommandSpec] {
4141

4242
val MergeIntoCommand = {
4343
val cmd = "org.apache.spark.sql.delta.commands.MergeIntoCommand"
44-
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
45-
val tableDesc = TableDesc(
46-
"target",
47-
classOf[SubqueryAliasTableExtractor],
48-
actionTypeDesc = Some(actionTypeDesc))
4944
val queryDesc = QueryDesc("source")
50-
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDesc))
45+
DeleteCommand.copy(classname = cmd, queryDescs = Seq(queryDesc))
5146
}
5247

5348
val OptimizeTableCommand = {

extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,51 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
387387
}
388388
}
389389
}
390+
391+
test("merge into path-based table") {
392+
withSingleCallEnabled {
393+
withCleanTmpResources(Seq(
394+
(s"$namespace1.$table2", "table"),
395+
(s"$namespace1", "database"))) {
396+
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
397+
doAs(admin, sql(createTableSql(namespace1, table2)))
398+
withTempDir(path => {
399+
doAs(admin, sql(createPathBasedTableSql(path)))
400+
val mergeIntoSql =
401+
s"""
402+
|MERGE INTO delta.`$path` AS target
403+
|USING $namespace1.$table2 AS source
404+
|ON target.id = source.id
405+
|WHEN MATCHED THEN
406+
| UPDATE SET
407+
| id = source.id,
408+
| name = source.name,
409+
| gender = source.gender,
410+
| birthDate = source.birthDate
411+
|WHEN NOT MATCHED
412+
| THEN INSERT (
413+
| id,
414+
| name,
415+
| gender,
416+
| birthDate
417+
| )
418+
| VALUES (
419+
| source.id,
420+
| source.name,
421+
| source.gender,
422+
| source.birthDate
423+
| )
424+
|""".stripMargin
425+
interceptContains[AccessControlException](
426+
doAs(someone, sql(mergeIntoSql)))(
427+
s"does not have [select] privilege on [$namespace1/$table2/id," +
428+
s"$namespace1/$table2/name,$namespace1/$table2/gender," +
429+
s"$namespace1/$table2/birthDate], [write] privilege on [[$path, $path/]]")
430+
doAs(admin, sql(mergeIntoSql))
431+
})
432+
}
433+
}
434+
}
390435
}
391436

392437
object DeltaCatalogRangerSparkExtensionSuite {

0 commit comments

Comments
 (0)