Skip to content

Commit

Permalink
[KYUUBI #5479][AUTHZ] Support Hudi CallProcedureHoodieCommand for sto…
Browse files Browse the repository at this point in the history
…red procedures

### _Why are the changes needed?_
To close #5479
Support Hudi CallProcedureHoodieCommand,  grammar https://hudi.apache.org/docs/procedures/

- CallProcedureHoodieCommand: https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CallProcedureHoodieCommand.scala

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_
No

Closes #5502 from AngersZhuuuu/KYUUBI-5479.

Closes #5479

583df35 [Angerszhuuuu] Update tableExtractors.scala
6b51a9d [Bowen Liang] refactor extractors in more scala way, and use lookupExtractor for reusing StringTableExtractor singleton
cde7992 [Angerszhuuuu] [KYUUBI #5479][AUTHZ] Support Hudi CallProcedureHoodieCommand

Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: Bowen Liang <liangbowen@gf.com.cn>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
2 people authored and davidyuan1223 committed Oct 26, 2023
1 parent bc3fcbb commit 1dc264a
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ExpressionSeqTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiDataSourceV2RelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoTargetTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiCallProcedureInputTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiCallProcedureOutputTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.IdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.LogicalRelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,37 @@
} ],
"opType" : "ALTERTABLE_PROPERTIES",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.CallProcedureHoodieCommand",
"tableDescs" : [ {
"fieldName" : "clone",
"fieldExtractor" : "HudiCallProcedureInputTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "OTHER"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : true,
"setCurrentDatabaseIfMissing" : true
}, {
"fieldName" : "clone",
"fieldExtractor" : "HudiCallProcedureOutputTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : true
} ],
"opType" : "QUERY",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.CompactionHoodieTableCommand",
"tableDescs" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
package org.apache.kyuubi.plugin.spark.authz.serde

import java.util.{Map => JMap}
import java.util.LinkedHashMap

import scala.collection.JavaConverters._

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.types.DataType
import org.apache.spark.unsafe.types.UTF8String

import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
import org.apache.kyuubi.util.reflect.ReflectUtils._
Expand Down Expand Up @@ -266,3 +269,225 @@ class HudiMergeIntoTargetTableExtractor extends TableExtractor {
}
}
}

abstract class HudiCallProcedureTableExtractor extends TableExtractor {

protected def extractTableIdentifier(
procedure: AnyRef,
args: AnyRef,
tableParameterKey: String): Option[String] = {
val tableIdentifierParameter =
invokeAs[Array[AnyRef]](procedure, "parameters")
.find(invokeAs[String](_, "name").equals(tableParameterKey))
.getOrElse(throw new IllegalArgumentException(s"Could not find param $tableParameterKey"))
val tableIdentifierParameterIndex = invokeAs[LinkedHashMap[String, Int]](args, "map")
.getOrDefault(tableParameterKey, INVALID_INDEX)
tableIdentifierParameterIndex match {
case INVALID_INDEX =>
None
case argsIndex =>
val dataType = invokeAs[DataType](tableIdentifierParameter, "dataType")
val row = invokeAs[InternalRow](args, "internalRow")
val tableName = InternalRow.getAccessor(dataType, true)(row, argsIndex)
Option(tableName.asInstanceOf[UTF8String].toString)
}
}

case class ProcedureArgsInputOutputPair(
input: Option[String] = None,
output: Option[String] = None)

protected val PROCEDURE_CLASS_PATH = "org.apache.spark.sql.hudi.command.procedures"

protected val INVALID_INDEX = -1

// These pairs are used to get the procedure input/output args which user passed in call command.
protected val procedureArgsInputOutputPairs: Map[String, ProcedureArgsInputOutputPair] = Map(
(
s"$PROCEDURE_CLASS_PATH.ArchiveCommitsProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.CommitsCompareProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.CopyToTableProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("new_table"))),
(
s"$PROCEDURE_CLASS_PATH.CopyToTempViewProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.CreateMetadataTableProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.CreateSavepointProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.DeleteMarkerProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.DeleteMetadataTableProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.DeleteSavepointProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ExportInstantsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.HdfsParquetImportProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.HelpProcedure",
ProcedureArgsInputOutputPair()),
(
s"$PROCEDURE_CLASS_PATH.HiveSyncProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.InitMetadataTableProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairAddpartitionmetaProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairCorruptedCleanFilesProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairDeduplicateProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairMigratePartitionMetaProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairOverwriteHoodiePropsProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RollbackToInstantTimeProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RollbackToSavepointProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RunBootstrapProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RunCleanProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RunClusteringProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RunCompactionProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowArchivedCommitsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowBootstrapMappingProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowClusteringProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCommitExtraMetadataProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCommitFilesProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCommitPartitionsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCommitWriteStatsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCompactionProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowFileSystemViewProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowFsPathDetailProcedure",
ProcedureArgsInputOutputPair()),
(
s"$PROCEDURE_CLASS_PATH.ShowHoodieLogFileMetadataProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowHoodieLogFileRecordsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowInvalidParquetProcedure",
ProcedureArgsInputOutputPair()),
(
s"$PROCEDURE_CLASS_PATH.ShowMetadataTableFilesProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowMetadataTablePartitionsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowMetadataTableStatsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowRollbacksProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowSavepointsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowTablePropertiesProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.StatsFileSizeProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.StatsWriteAmplificationProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.UpgradeOrDowngradeProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ValidateHoodieSyncProcedure",
ProcedureArgsInputOutputPair(
input = Some("src_table"),
output = Some("dst_table"))),
(
s"$PROCEDURE_CLASS_PATH.ValidateMetadataTableFilesProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))))
}

class HudiCallProcedureOutputTableExtractor
extends HudiCallProcedureTableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val procedure = invokeAs[AnyRef](v1, "procedure")
val args = invokeAs[AnyRef](v1, "args")
procedureArgsInputOutputPairs.get(procedure.getClass.getName)
.filter(_.output.isDefined)
.map { argsPairs =>
val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.output.get)
lookupExtractor[StringTableExtractor].apply(spark, tableIdentifier.get).orNull
}
}
}

class HudiCallProcedureInputTableExtractor
extends HudiCallProcedureTableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val procedure = invokeAs[AnyRef](v1, "procedure")
val args = invokeAs[AnyRef](v1, "args")
procedureArgsInputOutputPairs.get(procedure.getClass.getName)
.filter(_.input.isDefined)
.map { argsPairs =>
val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.input.get)
lookupExtractor[StringTableExtractor].apply(spark, tableIdentifier.get).orNull
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,31 @@ object HudiCommands {
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDescs))
}

val CallProcedureHoodieCommand = {
val cmd = "org.apache.spark.sql.hudi.command.CallProcedureHoodieCommand"
TableCommandSpec(
cmd,
Seq(
TableDesc(
"clone",
classOf[HudiCallProcedureInputTableExtractor],
actionTypeDesc = Some(ActionTypeDesc(actionType = Some(OTHER))),
isInput = true,
setCurrentDatabaseIfMissing = true),
TableDesc(
"clone",
classOf[HudiCallProcedureOutputTableExtractor],
actionTypeDesc = Some(ActionTypeDesc(actionType = Some(UPDATE))),
setCurrentDatabaseIfMissing = true)))
}

val data: Array[TableCommandSpec] = Array(
AlterHoodieTableAddColumnsCommand,
AlterHoodieTableChangeColumnCommand,
AlterHoodieTableDropPartitionCommand,
AlterHoodieTableRenameCommand,
AlterTableCommand,
CallProcedureHoodieCommand,
CreateHoodieTableAsSelectCommand,
CreateHoodieTableCommand,
CreateHoodieTableLikeCommand,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,4 +472,54 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
}
}
}

test("CallProcedureHoodieCommand") {
withSingleCallEnabled {
withCleanTmpResources(Seq(
(s"$namespace1.$table1", "table"),
(s"$namespace1.$table2", "table"),
(namespace1, "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(
admin,
sql(
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table1(id int, name string, city string)
|USING HUDI
|OPTIONS (
| type = 'cow',
| primaryKey = 'id',
| 'hoodie.datasource.hive_sync.enable' = 'false'
|)
|PARTITIONED BY(city)
|""".stripMargin))
doAs(
admin,
sql(
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table2(id int, name string, city string)
|USING HUDI
|OPTIONS (
| type = 'cow',
| primaryKey = 'id',
| 'hoodie.datasource.hive_sync.enable' = 'false'
|)
|PARTITIONED BY(city)
|""".stripMargin))

val copy_to_table =
s"CALL copy_to_table(table => '$namespace1.$table1', new_table => '$namespace1.$table2')"
interceptContains[AccessControlException] {
doAs(someone, sql(copy_to_table))
}(s"does not have [select] privilege on [$namespace1/$table1]")
doAs(admin, sql(copy_to_table))

val show_table_properties = s"CALL show_table_properties(table => '$namespace1.$table1')"
interceptContains[AccessControlException] {
doAs(someone, sql(show_table_properties))
}(s"does not have [select] privilege on [$namespace1/$table1]")
doAs(admin, sql(show_table_properties))
}
}
}
}

0 comments on commit 1dc264a

Please sign in to comment.