Skip to content

Commit

Permalink
[KYUUBI#5760] Support alter path-based table for Delta Lake in Authz (#…
Browse files Browse the repository at this point in the history
…5760)

# 🔍 Description
## Issue References 🔗
<!-- Append the issue number after #. If there is no issue for you to link create one or -->
<!-- If there are no issues to link, please provide details here. -->

This pull request fixes #5757 

## Describe Your Solution 🔧

Add uriDescs.


## Types of changes 🔖
<!--- What types of changes does your code introduce? Put an `x` in all the boxes that apply: -->
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️


#### Behavior With This Pull Request 🎉


#### Related Unit Tests


---

# Checklists
## 📝 Author Self Checklist
<!--- Go over all the following points, and put an `x` in all the boxes that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're here to help! -->
- [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [x] Test coverage is ok
- [x] Assignees are selected.
- [x] Minimum number of approvals
- [x] No changes are requested


**Be nice. Be informative.**
  • Loading branch information
zml1206 authored Nov 29, 2023
1 parent 39de210 commit 5761e83
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
} ],
"opType" : "ALTERTABLE_ADDCOLS",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "child",
"fieldExtractor" : "ResolvedTableURIExtractor",
"isInput" : false,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.AddPartitions",
"tableDescs" : [ {
Expand Down Expand Up @@ -45,7 +50,12 @@
} ],
"opType" : "ALTERTABLE_ADDCOLS",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "child",
"fieldExtractor" : "ResolvedTableURIExtractor",
"isInput" : false,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.AlterTable",
"tableDescs" : [ {
Expand All @@ -61,7 +71,12 @@
} ],
"opType" : "ALTERTABLE_PROPERTIES",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "ident",
"fieldExtractor" : "IdentifierURIExtractor",
"isInput" : false,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.AppendData",
"tableDescs" : [ {
Expand Down Expand Up @@ -319,7 +334,12 @@
} ],
"opType" : "ALTERTABLE_ADDCOLS",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "child",
"fieldExtractor" : "ResolvedTableURIExtractor",
"isInput" : false,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.DropPartitions",
"tableDescs" : [ {
Expand Down Expand Up @@ -478,7 +498,12 @@
} ],
"opType" : "ALTERTABLE_RENAMECOL",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "child",
"fieldExtractor" : "ResolvedTableURIExtractor",
"isInput" : false,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.RenamePartitions",
"tableDescs" : [ {
Expand Down Expand Up @@ -526,7 +551,12 @@
} ],
"opType" : "ALTERTABLE_REPLACECOLS",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "child",
"fieldExtractor" : "ResolvedTableURIExtractor",
"isInput" : false,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.ReplaceData",
"tableDescs" : [ {
Expand Down Expand Up @@ -676,7 +706,12 @@
} ],
"opType" : "ALTERTABLE_PROPERTIES",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "ResolvedTableURIExtractor",
"isInput" : false,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable",
"tableDescs" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
val AlterTable = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.AlterTable"
val tableDesc = TableDesc("ident", classOf[IdentifierTableExtractor])
TableCommandSpec(cmd, Seq(tableDesc), ALTERTABLE_PROPERTIES)
val uriDescs = Seq(UriDesc("ident", classOf[IdentifierURIExtractor]))
TableCommandSpec(cmd, Seq(tableDesc), ALTERTABLE_PROPERTIES, uriDescs = uriDescs)
}

val AlterTableAddColumns = {
Expand All @@ -51,7 +52,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {

val AddColumns = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.AddColumns"
TableCommandSpec(cmd, Seq(resolvedTableDesc), ALTERTABLE_ADDCOLS)
val uriDescs = Seq(UriDesc("child", classOf[ResolvedTableURIExtractor]))
TableCommandSpec(cmd, Seq(resolvedTableDesc), ALTERTABLE_ADDCOLS, uriDescs = uriDescs)
}

val AlterColumn = {
Expand All @@ -66,12 +68,12 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {

val ReplaceColumns = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.ReplaceColumns"
TableCommandSpec(cmd, Seq(resolvedTableDesc), ALTERTABLE_REPLACECOLS)
AddColumns.copy(classname = cmd, opType = ALTERTABLE_REPLACECOLS)
}

val RenameColumn = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.RenameColumn"
TableCommandSpec(cmd, Seq(resolvedTableDesc), ALTERTABLE_RENAMECOL)
AddColumns.copy(classname = cmd, opType = ALTERTABLE_RENAMECOL)
}

val AlterTableAddPartition = {
Expand Down Expand Up @@ -638,7 +640,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
val SetTableProperties = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.SetTableProperties"
val tableDesc = TableDesc("table", classOf[ResolvedTableTableExtractor])
TableCommandSpec(cmd, Seq(tableDesc), ALTERTABLE_PROPERTIES)
val uriDescs = Seq(UriDesc("table", classOf[ResolvedTableURIExtractor]))
TableCommandSpec(cmd, Seq(tableDesc), ALTERTABLE_PROPERTIES, uriDescs = uriDescs)
}

val AddArchivesCommand = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.kyuubi.plugin.spark.authz.AccessControlException
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import org.apache.kyuubi.plugin.spark.authz.ranger.DeltaCatalogRangerSparkExtensionSuite._
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.isSparkV32OrGreater
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.{isSparkV32OrGreater, isSparkV35OrGreater}
import org.apache.kyuubi.tags.DeltaTest
import org.apache.kyuubi.util.AssertionUtils._

Expand All @@ -41,6 +41,14 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
val table1 = "table1_delta"
val table2 = "table2_delta"

def propString(props: Map[String, String]): String =
if (props.isEmpty) ""
else {
props
.map { case (key, value) => s"'$key' = '$value'" }
.mkString("TBLPROPERTIES (", ",", ")")
}

def createTableSql(namespace: String, table: String): String =
s"""
|CREATE TABLE IF NOT EXISTS $namespace.$table (
Expand All @@ -53,7 +61,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|PARTITIONED BY (gender)
|""".stripMargin

def createPathBasedTableSql(path: Path): String =
def createPathBasedTableSql(path: Path, props: Map[String, String] = Map.empty): String =
s"""
|CREATE TABLE IF NOT EXISTS delta.`$path` (
| id INT,
Expand All @@ -63,6 +71,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|)
|USING DELTA
|PARTITIONED BY (gender)
|${propString(props)}
|""".stripMargin

override def withFixture(test: NoArgTest): Outcome = {
Expand Down Expand Up @@ -468,6 +477,95 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
doAs(admin, sql(vacuumTableSql2))
})
}

test("alter path-based table set properties") {
withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path)))
val setPropertiesSql = s"ALTER TABLE delta.`$path`" +
s" SET TBLPROPERTIES ('delta.appendOnly' = 'true')"
interceptEndsWith[AccessControlException](
doAs(someone, sql(setPropertiesSql)))(
s"does not have [write] privilege on [[$path, $path/]]")
doAs(admin, sql(setPropertiesSql))
})
}

test("alter path-based table add columns") {
withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path)))
val addColumnsSql = s"ALTER TABLE delta.`$path` ADD COLUMNS (age int)"
interceptEndsWith[AccessControlException](
doAs(someone, sql(addColumnsSql)))(
s"does not have [write] privilege on [[$path, $path/]]")
doAs(admin, sql(addColumnsSql))
})
}

test("alter path-based table change column") {
withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path)))
val changeColumnSql = s"ALTER TABLE delta.`$path`" +
s" CHANGE COLUMN gender gender STRING AFTER birthDate"
interceptEndsWith[AccessControlException](
doAs(someone, sql(changeColumnSql)))(
s"does not have [write] privilege on [[$path, $path/]]")
doAs(admin, sql(changeColumnSql))
})
}

test("alter path-based table drop column") {
assume(
isSparkV32OrGreater,
"alter table drop column is available in Delta Lake 1.2.0 and above")

withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path, Map("delta.columnMapping.mode" -> "name"))))
val dropColumnSql = s"ALTER TABLE delta.`$path` DROP COLUMN birthDate"
interceptEndsWith[AccessControlException](
doAs(someone, sql(dropColumnSql)))(
s"does not have [write] privilege on [[$path, $path/]]")
doAs(admin, sql(dropColumnSql))
})
}

test("alter path-based table rename column") {
assume(
isSparkV32OrGreater,
"alter table rename column is available in Delta Lake 1.2.0 and above")

withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path, Map("delta.columnMapping.mode" -> "name"))))
val renameColumnSql = s"ALTER TABLE delta.`$path`" +
s" RENAME COLUMN birthDate TO dateOfBirth"
interceptEndsWith[AccessControlException](
doAs(someone, sql(renameColumnSql)))(
s"does not have [write] privilege on [[$path, $path/]]")
doAs(admin, sql(renameColumnSql))
})
}

test("alter path-based table replace columns") {
withTempDir(path => {
assume(
isSparkV32OrGreater,
"alter table replace columns is not available in Delta Lake 1.0.1")

doAs(admin, sql(createPathBasedTableSql(path, Map("delta.columnMapping.mode" -> "name"))))
val replaceColumnsSql = s"ALTER TABLE delta.`$path`" +
s" REPLACE COLUMNS (id INT, name STRING, gender STRING)"
interceptEndsWith[AccessControlException](
doAs(someone, sql(replaceColumnsSql)))(
s"does not have [write] privilege on [[$path, $path/]]")

// There was a bug before Delta Lake 3.0, it will throw AnalysisException message
// "Cannot drop column from a struct type with a single field:
// StructType(StructField(birthDate,TimestampType,true))".
// For details, see https://github.com/delta-io/delta/pull/1822
if (isSparkV35OrGreater) {
doAs(admin, sql(replaceColumnsSql))
}
})
}
}

object DeltaCatalogRangerSparkExtensionSuite {
Expand Down

0 comments on commit 5761e83

Please sign in to comment.