Skip to content

Commit e0bdf80

Browse files
committed
[SC-5603] Make CTAS work with ACL enable DB Spark
## What changes were proposed in this pull request? ACL enabled Spark currently throws a table not found exception when we issue a CTAS command. This bug is caused by a number of factors: 1. CTAS is composed of two stages: creating the table and inserting data into the table. 2. The order of these commands has been inverted to make sure we never expose a partially written table to an end user. 3. We check ACLs for the CTAS command, and also for its child commands. ACL checking fails because we cannot find the table when executing the insert (it has not been created yet). This PR fixes this by only ACL checking the CTAS command, and not its children. This fix is also applied to all other `RunnableCommand`s. The only exception is the `ExplainCommand`, in this case we **want** to execute `CheckAnalysis` on its child. This has been implemented by adding a trusted mode to `CheckPermissions`. We do not check the permissions when we are in trusted mode. The value of trusted mode is thread local, and can by set by calling the `CheckPermissions.trusted` method. Note that a creative developer can abuse this by either using Java (the trusted method is package-local to `com.databricks`) or some light reflection. The other part of the solution is that we add a planning rule to `QueryExecution`, which wraps a `RunnableCommand` with a `TrustedRunnableCommand `. Note that the downside to this is that we are introducing a change in an apache package which might create merge conflicts. Finally I have added an sbt task `sparkShellACL` which starts the Spark Shell with ACLs enabled. This is easier for testing. Remember to set `sc.setLocalProperty("spark.databricks.token", "super")` when you start the shell (if you don't every command will fail). You can change users by modifying the `spark.databricks.token` local spark properties. ## How was this patch tested? Existing tests & Added an integration test to the ACL end to end tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes apache#184 from hvanhovell/SC-5603.
1 parent 747a191 commit e0bdf80

File tree

8 files changed

+147
-2
lines changed

8 files changed

+147
-2
lines changed

project/SparkBuild.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,8 @@ object SparkBuild extends PomBuild {
416416
""".stripMargin)
417417
val sparkSql = taskKey[Unit]("starts the spark sql CLI.")
418418

419+
val sparkShellAcl = taskKey[Unit]("start a spark-shell with ACL support.")
420+
419421
enable(Seq(
420422
connectInput in run := true,
421423
fork := true,
@@ -427,6 +429,15 @@ object SparkBuild extends PomBuild {
427429
(runMain in Compile).toTask(" org.apache.spark.repl.Main -usejavacp").value
428430
},
429431

432+
sparkShellAcl := {
433+
(runMain in Compile).toTask(
434+
" -Dspark.sql.extensions=com.databricks.sql.acl.HiveAclExtensions" +
435+
" -Dspark.databricks.acl.provider=com.databricks.sql.acl.ReflectionBackedAclProvider" +
436+
" -Dspark.databricks.acl.client=com.databricks.sql.acl.SharedAclBackend" +
437+
" -Dspark.databricks.acl.enabled=true" +
438+
" org.apache.spark.repl.Main -usejavacp").value
439+
},
440+
430441
sparkPackage := {
431442
import complete.DefaultParsers._
432443
val packages :: className :: otherArgs = spaceDelimited("<group:artifact:version> <MainClass> [args]").parsed.toList

sql/core/src/main/scala/com/databricks/sql/acl/CheckPermissions.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ class CheckPermissions(catalog: PublicCatalog, aclClient: AclClient)
3232
val permissionChecker = new PermissionChecker(aclClient)
3333

3434
def apply(plan: LogicalPlan): Unit = {
35+
if (CheckPermissions.isTrusted) {
36+
return
37+
}
3538
getRequestsToCheck(plan).foreach { request =>
3639
if (!permissionChecker(request)) {
3740
throw new SecurityException(toErrorMessageForFailedRequest(request))
@@ -373,3 +376,18 @@ class CheckPermissions(catalog: PublicCatalog, aclClient: AclClient)
373376
s"User does not ${actionString} ${request.securable.prettyString}"
374377
}
375378
}
379+
380+
private[databricks] object CheckPermissions {
381+
private val trusted = new ThreadLocal[Boolean] {
382+
override def initialValue(): Boolean = false
383+
}
384+
385+
private[databricks] def isTrusted: Boolean = trusted.get()
386+
387+
private[databricks] def trusted[T](block: => T): T = {
388+
trusted.set(true)
389+
try block finally {
390+
trusted.set(false)
391+
}
392+
}
393+
}

sql/core/src/test/scala/com/databricks/sql/acl/SharedAclBackend.scala renamed to sql/core/src/main/scala/com/databricks/sql/acl/SharedAclBackend.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ class SharedAclBackend {
138138
}
139139
log.info(s"modify[token=$token, modifications=${modifications.mkString("[", ",", "]")}]")
140140
}
141+
142+
/**
143+
* Get a token from the Databricks DriverLocal. This is not implemented.
144+
*/
145+
def getTokenFromLocalContext(): Option[String] = None
141146
}
142147

143148
object SharedAclBackend {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright (C) 2016 Databricks, Inc.
3+
*
4+
* Portions of this software incorporate or are derived from software contained within Apache Spark,
5+
* and this modified software differs from the Apache Spark software provided under the Apache
6+
* License, Version 2.0, a copy of which you may obtain at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*/
9+
package com.databricks.sql.acl
10+
11+
import org.apache.spark.sql.{Row, SparkSession}
12+
import org.apache.spark.sql.catalyst.expressions.Attribute
13+
import org.apache.spark.sql.catalyst.rules.Rule
14+
import org.apache.spark.sql.execution.SparkPlan
15+
import org.apache.spark.sql.execution.command._
16+
17+
/**
18+
* A trusted runnable command that we trust as soon its permission check out. In practice this
19+
* means that we do not check permissions for the command or for commands that the command issues
20+
* when the command is executed. An example of the latter is a CreateTableAsSelect command that
21+
* both issues a create and an insert command.
22+
*/
23+
case class TrustedRunnableCommand(cmd: RunnableCommand) extends RunnableCommand {
24+
assert(cmd != null)
25+
26+
override def output: Seq[Attribute] = cmd.output
27+
28+
override def run(sparkSession: SparkSession): Seq[Row] = {
29+
CheckPermissions.trusted(cmd.run(sparkSession))
30+
}
31+
32+
override def generateTreeString(
33+
depth: Int,
34+
lastChildren: Seq[Boolean],
35+
builder: StringBuilder,
36+
verbose: Boolean,
37+
prefix: String): StringBuilder = {
38+
cmd.generateTreeString(depth, lastChildren, builder, verbose, prefix)
39+
}
40+
}
41+
42+
/**
43+
* This rule wraps a RunnableCommand with a TrustedRunnableCommand before the command is executed.
44+
*/
45+
object MakeRunnableCommandTrusted extends Rule[SparkPlan] {
46+
override def apply(plan: SparkPlan): SparkPlan = plan match {
47+
case ExecutedCommandExec(_: TrustedRunnableCommand) => plan
48+
case ExecutedCommandExec(_: ExplainCommand) => plan
49+
case ExecutedCommandExec(_: ShowTablesCommand) => plan
50+
case ExecutedCommandExec(_: DescribeTableCommand) => plan
51+
case ExecutedCommandExec(cmd) => ExecutedCommandExec(TrustedRunnableCommand(cmd))
52+
case _ => plan
53+
}
54+
}

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,12 @@ class Dataset[T] private[sql](
245245
*/
246246
private[sql] def showString(_numRows: Int, truncate: Int = 20): String = {
247247
val numRows = _numRows.max(0)
248-
val takeResult = toDF().take(numRows + 1)
248+
val takeResult = withAction("show", limit(numRows + 1).queryExecution) { plan =>
249+
val encoder = RowEncoder(schema).resolveAndBind(
250+
logicalPlan.output,
251+
sparkSession.sessionState.analyzer)
252+
plan.executeCollect().map(encoder.fromRow)
253+
}
249254
val hasMoreData = takeResult.length > numRows
250255
val data = takeResult.take(numRows)
251256

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution
2020
import java.nio.charset.StandardCharsets
2121
import java.sql.Timestamp
2222

23+
import com.databricks.sql.acl.MakeRunnableCommandTrusted
24+
2325
import org.apache.spark.rdd.RDD
2426
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
2527
import org.apache.spark.sql.catalyst.InternalRow
@@ -101,7 +103,8 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
101103
EnsureRequirements(sparkSession.sessionState.conf),
102104
CollapseCodegenStages(sparkSession.sessionState.conf),
103105
ReuseExchange(sparkSession.sessionState.conf),
104-
ReuseSubquery(sparkSession.sessionState.conf))
106+
ReuseSubquery(sparkSession.sessionState.conf),
107+
MakeRunnableCommandTrusted)
105108

106109
protected def stringOrError[A](f: => A): String =
107110
try f.toString catch { case e: AnalysisException => e.toString }
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
create database db1;
2+
3+
create table db1.test using parquet as select * from range(100);
4+
5+
drop table db1.test;
6+
7+
drop database db1;
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
-- Automatically generated by ThriftEndToEndAclTestSuite
2+
-- Number of queries: 4
3+
4+
5+
-- !query 0
6+
create database db1
7+
-- !query 0 token
8+
super
9+
-- !query 0 schema
10+
struct<Result:string>
11+
-- !query 0 output
12+
13+
14+
15+
-- !query 1
16+
create table db1.test using parquet as select * from range(100)
17+
-- !query 1 token
18+
super
19+
-- !query 1 schema
20+
struct<Result:string>
21+
-- !query 1 output
22+
23+
24+
25+
-- !query 2
26+
drop table db1.test
27+
-- !query 2 token
28+
super
29+
-- !query 2 schema
30+
struct<Result:string>
31+
-- !query 2 output
32+
33+
34+
35+
-- !query 3
36+
drop database db1
37+
-- !query 3 token
38+
super
39+
-- !query 3 schema
40+
struct<Result:string>
41+
-- !query 3 output
42+

0 commit comments

Comments
 (0)