Skip to content

Commit 62c52b2

Browse files
committed
Support optimize path-based table for Delta Lake in Authz
1 parent d4dd1cb commit 62c52b2

File tree

6 files changed

+77
-12
lines changed

6 files changed

+77
-12
lines changed

extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ org.apache.kyuubi.plugin.spark.authz.serde.IdentifierURIExtractor
2323
org.apache.kyuubi.plugin.spark.authz.serde.PartitionLocsSeqURIExtractor
2424
org.apache.kyuubi.plugin.spark.authz.serde.PropertiesLocationUriExtractor
2525
org.apache.kyuubi.plugin.spark.authz.serde.PropertiesPathUriExtractor
26+
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedTableURIExtractor
2627
org.apache.kyuubi.plugin.spark.authz.serde.StringSeqURIExtractor
2728
org.apache.kyuubi.plugin.spark.authz.serde.StringURIExtractor
2829
org.apache.kyuubi.plugin.spark.authz.serde.SubqueryAliasURIExtractor
30+
org.apache.kyuubi.plugin.spark.authz.serde.TableIdentifierOptionURIExtractor
31+
org.apache.kyuubi.plugin.spark.authz.serde.TableIdentifierURIExtractor
2932
org.apache.kyuubi.plugin.spark.authz.serde.TableSpecURIExtractor

extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2226,7 +2226,19 @@
22262226
} ],
22272227
"opType" : "ALTERTABLE_COMPACT",
22282228
"queryDescs" : [ ],
2229-
"uriDescs" : [ ]
2229+
"uriDescs" : [ {
2230+
"fieldName" : "child",
2231+
"fieldExtractor" : "ResolvedTableURIExtractor",
2232+
"isInput" : false
2233+
}, {
2234+
"fieldName" : "tableId",
2235+
"fieldExtractor" : "TableIdentifierOptionURIExtractor",
2236+
"isInput" : false
2237+
}, {
2238+
"fieldName" : "path",
2239+
"fieldExtractor" : "StringURIExtractor",
2240+
"isInput" : false
2241+
} ]
22302242
}, {
22312243
"classname" : "org.apache.spark.sql.delta.commands.UpdateCommand",
22322244
"tableDescs" : [ {

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package org.apache.kyuubi.plugin.spark.authz.serde
1919

20-
import java.util.{Map => JMap}
21-
import java.util.LinkedHashMap
20+
import java.util.{LinkedHashMap, Map => JMap}
2221

2322
import scala.collection.JavaConverters._
2423

@@ -81,14 +80,18 @@ object TableExtractor {
8180
class TableIdentifierTableExtractor extends TableExtractor {
8281
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
8382
val identifier = v1.asInstanceOf[TableIdentifier]
84-
val owner =
85-
try {
86-
val catalogTable = spark.sessionState.catalog.getTableMetadata(identifier)
87-
Option(catalogTable.owner).filter(_.nonEmpty)
88-
} catch {
89-
case _: Exception => None
90-
}
91-
Some(Table(None, identifier.database, identifier.table, owner))
83+
if (isPathIdentifier(identifier.table, spark)) {
84+
None
85+
} else {
86+
val owner =
87+
try {
88+
val catalogTable = spark.sessionState.catalog.getTableMetadata(identifier)
89+
Option(catalogTable.owner).filter(_.nonEmpty)
90+
} catch {
91+
case _: Exception => None
92+
}
93+
Some(Table(None, identifier.database, identifier.table, owner))
94+
}
9295
}
9396
}
9497

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.kyuubi.plugin.spark.authz.serde
1919

2020
import org.apache.spark.sql.SparkSession
21+
import org.apache.spark.sql.catalyst.TableIdentifier
2122
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
2223
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
2324
import org.apache.spark.sql.connector.catalog.Identifier
@@ -133,3 +134,26 @@ class DataSourceV2RelationURIExtractor extends URIExtractor {
133134
}
134135
}
135136
}
137+
138+
class ResolvedTableURIExtractor extends URIExtractor {
139+
override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
140+
val identifier = invokeAs[AnyRef](v1, "identifier")
141+
lookupExtractor[IdentifierURIExtractor].apply(spark, identifier)
142+
}
143+
}
144+
145+
class TableIdentifierURIExtractor extends URIExtractor {
146+
override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = v1 match {
147+
case tableIdentifier: TableIdentifier if isPathIdentifier(tableIdentifier.table, spark) =>
148+
Seq(tableIdentifier.table).map(Uri)
149+
case _ => Nil
150+
}
151+
}
152+
153+
class TableIdentifierOptionURIExtractor extends URIExtractor {
154+
override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = v1 match {
155+
case Some(tableIdentifier: TableIdentifier) =>
156+
lookupExtractor[TableIdentifierURIExtractor].apply(spark, tableIdentifier)
157+
case _ => Nil
158+
}
159+
}

extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ object DeltaCommands extends CommandSpecs[TableCommandSpec] {
4949
val cmd = "org.apache.spark.sql.delta.commands.OptimizeTableCommand"
5050
val childDesc = TableDesc("child", classOf[ResolvedTableTableExtractor])
5151
val tableDesc = TableDesc("tableId", classOf[TableIdentifierOptionTableExtractor])
52-
TableCommandSpec(cmd, Seq(childDesc, tableDesc), ALTERTABLE_COMPACT)
52+
val uriDescs = Seq(
53+
UriDesc("child", classOf[ResolvedTableURIExtractor]),
54+
UriDesc("tableId", classOf[TableIdentifierOptionURIExtractor]),
55+
UriDesc("path", classOf[StringURIExtractor]))
56+
TableCommandSpec(cmd, Seq(childDesc, tableDesc), ALTERTABLE_COMPACT, uriDescs = uriDescs)
5357
}
5458

5559
val VacuumTableCommand = {

extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,25 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
432432
}
433433
}
434434
}
435+
436+
test("optimize path-based table") {
437+
assume(isSparkV32OrGreater)
438+
439+
withTempDir(path => {
440+
doAs(admin, sql(createPathBasedTableSql(path)))
441+
val optimizeTableSql1 = s"OPTIMIZE delta.`$path`"
442+
interceptContains[AccessControlException](
443+
doAs(someone, sql(optimizeTableSql1)))(
444+
s"does not have [write] privilege on [[$path, $path/]]")
445+
doAs(admin, sql(optimizeTableSql1))
446+
447+
val optimizeTableSql2 = s"OPTIMIZE '$path'"
448+
interceptContains[AccessControlException](
449+
doAs(someone, sql(optimizeTableSql2)))(
450+
s"does not have [write] privilege on [[$path, $path/]]")
451+
doAs(admin, sql(optimizeTableSql2))
452+
})
453+
}
435454
}
436455

437456
object DeltaCatalogRangerSparkExtensionSuite {

0 commit comments

Comments
 (0)