Skip to content

Commit

Permalink
Support Delete/Insert/Update table command for Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 committed Nov 2, 2023
1 parent ea9a78f commit e2a3fe0
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedIdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedTableTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.StringTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.SubqueryAliasTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.TableIdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.TableTableExtractor
Original file line number Diff line number Diff line change
Expand Up @@ -1976,4 +1976,68 @@
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.delta.commands.DeleteCommand",
"tableDescs" : [ {
"fieldName" : "catalogTable",
"fieldExtractor" : "CatalogTableOptionTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
}, {
"fieldName" : "target",
"fieldExtractor" : "SubqueryAliasTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.delta.commands.UpdateCommand",
"tableDescs" : [ {
"fieldName" : "catalogTable",
"fieldExtractor" : "CatalogTableOptionTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
}, {
"fieldName" : "target",
"fieldExtractor" : "SubqueryAliasTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.DataType
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -234,6 +235,19 @@ class ResolvedIdentifierTableExtractor extends TableExtractor {
}
}

/**
* org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
*/
class SubqueryAliasTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
invokeAs[AnyRef](v1, "child") match {
case lr: LogicalRelation =>
lookupExtractor[LogicalRelationTableExtractor].apply(spark, lr)
case _ => None
}
}
}

/**
* org.apache.spark.sql.connector.catalog.Table
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.spark.authz.gen

import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
import org.apache.kyuubi.plugin.spark.authz.serde._

object DeltaCommands extends CommandSpecs[TableCommandSpec] {

val DeleteCommand = {
val cmd = "org.apache.spark.sql.delta.commands.DeleteCommand"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
val tableDesc = TableDesc(
"catalogTable",
classOf[CatalogTableOptionTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc))
val targetDesc = TableDesc(
"target",
classOf[SubqueryAliasTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc, targetDesc))
}

val UpdateCommand = {
val cmd = "org.apache.spark.sql.delta.commands.UpdateCommand"
DeleteCommand.copy(classname = cmd)
}

override def specs: Seq[TableCommandSpec] = Seq(
DeleteCommand,
UpdateCommand)
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class JsonSpecFileGenerator extends AnyFunSuite {
// scalastyle:on
test("check spec json files") {
writeCommandSpecJson("database", Seq(DatabaseCommands))
writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands, HudiCommands))
writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands, HudiCommands, DeltaCommands))
writeCommandSpecJson("function", Seq(FunctionCommands))
writeCommandSpecJson("scan", Seq(Scans))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
val table1 = "table1_delta"
val table2 = "table2_delta"

def createTableSql(namespace: String, table: String): String =
s"""
|CREATE TABLE IF NOT EXISTS $namespace.$table (
| id INT,
| name STRING,
| gender STRING,
| birthDate TIMESTAMP
|)
|USING DELTA
|PARTITIONED BY (gender)
|""".stripMargin

override def withFixture(test: NoArgTest): Outcome = {
test()
}
Expand Down Expand Up @@ -66,35 +78,17 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table1 (
| id INT,
| firstName STRING,
| middleName STRING,
| lastName STRING,
| name STRING,
| gender STRING,
| birthDate TIMESTAMP,
| ssn STRING,
| salary INT
| birthDate TIMESTAMP
|) USING DELTA
|""".stripMargin
interceptContains[AccessControlException] {
doAs(someone, sql(createNonPartitionTableSql))
}(s"does not have [create] privilege on [$namespace1/$table1]")
doAs(admin, sql(createNonPartitionTableSql))

val createPartitionTableSql =
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table2 (
| id INT,
| firstName STRING,
| middleName STRING,
| lastName STRING,
| gender STRING,
| birthDate TIMESTAMP,
| ssn STRING,
| salary INT
|)
|USING DELTA
|PARTITIONED BY (gender)
|""".stripMargin
val createPartitionTableSql = createTableSql(namespace1, table2)
interceptContains[AccessControlException] {
doAs(someone, sql(createPartitionTableSql))
}(s"does not have [create] privilege on [$namespace1/$table2]")
Expand All @@ -109,13 +103,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
s"""
|CREATE OR REPLACE TABLE $namespace1.$table1 (
| id INT,
| firstName STRING,
| middleName STRING,
| lastName STRING,
| name STRING,
| gender STRING,
| birthDate TIMESTAMP,
| ssn STRING,
| salary INT
| birthDate TIMESTAMP
|) USING DELTA
|""".stripMargin
interceptContains[AccessControlException] {
Expand All @@ -128,23 +118,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
test("alter table") {
withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), (s"$namespace1", "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(
admin,
sql(
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table1 (
| id INT,
| firstName STRING,
| middleName STRING,
| lastName STRING,
| gender STRING,
| birthDate TIMESTAMP,
| ssn STRING,
| salary INT
|)
|USING DELTA
|PARTITIONED BY (gender)
|""".stripMargin))
doAs(admin, sql(createTableSql(namespace1, table1)))

// add columns
interceptContains[AccessControlException](
Expand All @@ -164,7 +138,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
doAs(
someone,
sql(s"ALTER TABLE $namespace1.$table1" +
s" REPLACE COLUMNS (id INT, firstName STRING)")))(
s" REPLACE COLUMNS (id INT, name STRING)")))(
s"does not have [alter] privilege on [$namespace1/$table1]")

// rename column
Expand All @@ -189,6 +163,62 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
s"does not have [alter] privilege on [$namespace1/$table1]")
}
}

test("delete from table") {
withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), (s"$namespace1", "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(admin, sql(createTableSql(namespace1, table1)))
interceptContains[AccessControlException](
doAs(someone, sql(s"DELETE FROM $namespace1.$table1 WHERE birthDate < '1955-01-01'")))(
s"does not have [update] privilege on [$namespace1/$table1]")
}
}

test("insert table") {
withSingleCallEnabled {
withCleanTmpResources(Seq(
(s"$namespace1.$table1", "table"),
(s"$namespace1.$table2", "table"),
(s"$namespace1", "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(admin, sql(createTableSql(namespace1, table1)))
doAs(admin, sql(createTableSql(namespace1, table2)))

// insert into
interceptContains[AccessControlException](
doAs(
someone,
sql(s"INSERT INTO $namespace1.$table1" +
s" SELECT * FROM $namespace1.$table2")))(
s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," +
s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
s" [update] privilege on [$namespace1/$table1]")

// insert overwrite
interceptContains[AccessControlException](
doAs(
someone,
sql(s"INSERT OVERWRITE $namespace1.$table1" +
s" SELECT * FROM $namespace1.$table2")))(
s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," +
s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
s" [update] privilege on [$namespace1/$table1]")
}
}
}

test("update table") {
withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), (s"$namespace1", "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(admin, sql(createTableSql(namespace1, table1)))
interceptContains[AccessControlException](
doAs(
someone,
sql(s"UPDATE $namespace1.$table1" +
s" SET gender = 'Female' WHERE gender = 'F'")))(
s"does not have [update] privilege on [$namespace1/$table1]")
}
}
}

object DeltaCatalogRangerSparkExtensionSuite {
Expand Down

0 comments on commit e2a3fe0

Please sign in to comment.