Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kyuubi.plugin.spark.authz.ranger

import org.apache.spark.sql.catalyst.expressions.ScalarSubquery
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View}
import org.apache.spark.sql.catalyst.rules.Rule

Expand All @@ -36,7 +37,14 @@ class RuleApplyPermanentViewMarker extends Rule[LogicalPlan] {
plan mapChildren {
case p: PermanentViewMarker => p
case permanentView: View if hasResolvedPermanentView(permanentView) =>
PermanentViewMarker(permanentView, permanentView.desc)
val resolvedSubquery = permanentView.transformAllExpressions {
case scalarSubquery: ScalarSubquery =>
// TODO: Currently, we do not do an auth check in the subquery
// as the main query part also secures it. But for performance consideration,
// we also pre-check it in subqueries and fail fast with negative privileges.
scalarSubquery.copy(plan = PermanentViewMarker(scalarSubquery.plan, null))
}
PermanentViewMarker(resolvedSubquery, resolvedSubquery.desc)
case other => apply(other)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,14 @@ class TableIdentifierTableExtractor extends TableExtractor {
*/
class CatalogTableTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val catalogTable = v1.asInstanceOf[CatalogTable]
val identifier = catalogTable.identifier
val owner = Option(catalogTable.owner).filter(_.nonEmpty)
Some(Table(None, identifier.database, identifier.table, owner))
if (null == v1) {
None
} else {
val catalogTable = v1.asInstanceOf[CatalogTable]
val identifier = catalogTable.identifier
val owner = Option(catalogTable.owner).filter(_.nonEmpty)
Some(Table(None, identifier.database, identifier.table, owner))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kyuubi.plugin.spark.authz.util

import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

Expand All @@ -25,6 +26,10 @@ import org.apache.spark.sql.catalyst.rules.Rule
*/
class RuleEliminateViewMarker extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan.transformUp { case pvm: PermanentViewMarker => pvm.child }
plan.transformUp {
case pvm: PermanentViewMarker => pvm.child.transformAllExpressions {
case s: SubqueryExpression => s.withNewPlan(apply(s.plan))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -747,4 +747,48 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
assert(e.getMessage.contains(s"does not have [select] privilege on [$db1/$table/id]"))
}
}

test("[KYUUBI #5417] should not check dependent subquery plan privilege") {
val db1 = defaultDb
val table1 = "table1"
val table2 = "table2"
val view1 = "view1"
withCleanTmpResources(
Seq((s"$db1.$table1", "table"), (s"$db1.$table2", "table"), (s"$db1.$view1", "view"))) {
doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int, scope int)"))
doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table2 (id int, scope int)"))

val e1 = intercept[AccessControlException] {
doAs(
someone,
sql(
s"""
|WITH temp AS (
| SELECT max(scope) max_scope
| FROM $db1.$table1)
|SELECT id as new_id FROM $db1.$table2
|WHERE scope = (SELECT max_scope FROM temp)
|""".stripMargin).show())
}
// Will first check subquery privilege.
assert(e1.getMessage.contains(s"does not have [select] privilege on [$db1/$table1/scope]"))

doAs(
admin,
sql(
s"""
|CREATE VIEW $db1.$view1
|AS
|WITH temp AS (
| SELECT max(scope) max_scope
| FROM $db1.$table1)
|SELECT id as new_id FROM $db1.$table2
|WHERE scope = (SELECT max_scope FROM temp)
|""".stripMargin))
// Will just check permanent view privilege.
val e2 = intercept[AccessControlException](
doAs(someone, sql(s"SELECT * FROM $db1.$view1".stripMargin).show()))
assert(e2.getMessage.contains(s"does not have [select] privilege on [$db1/$view1/new_id]"))
}
}
}