Skip to content

[SPARK-30312][SQL][2.4] Preserve path permission and acl when truncate table #27173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,14 @@ object SQLConf {
"turning the flag on provides a way for these sources to see these partitionBy columns.")
.booleanConf
.createWithDefault(false)

val TRUNCATE_TABLE_IGNORE_PERMISSION_ACL =
buildConf("spark.sql.truncateTable.ignorePermissionAcl")
.internal()
.doc("When set to true, TRUNCATE TABLE command will not try to set back original " +
"permission and ACLs when re-creating the table/partition paths.")
.booleanConf
.createWithDefault(false)
}

/**
Expand Down Expand Up @@ -1983,6 +1991,9 @@ class SQLConf extends Serializable with Logging {

def setOpsPrecedenceEnforced: Boolean = getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED)

def truncateTableIgnorePermissionAcl: Boolean =
getConf(SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.Try
import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileContext, FsConstants, Path}
import org.apache.hadoop.fs.permission.{AclEntry, FsPermission}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -457,13 +458,58 @@ case class TruncateTableCommand(
partLocations
}
val hadoopConf = spark.sessionState.newHadoopConf()
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
locations.foreach { location =>
if (location.isDefined) {
val path = new Path(location.get)
try {
val fs = path.getFileSystem(hadoopConf)

// Not all fs impl. support these APIs.
var optPermission: Option[FsPermission] = None
var optAcls: Option[java.util.List[AclEntry]] = None
if (!ignorePermissionAcl) {
val fileStatus = fs.getFileStatus(path)
try {
optPermission = Some(fileStatus.getPermission())
} catch {
case NonFatal(_) => // do nothing
}

try {
optAcls = Some(fs.getAclStatus(path).getEntries)
} catch {
case NonFatal(_) => // do nothing
}
}

fs.delete(path, true)
// We should keep original permission/acl of the path.
// For owner/group, only super-user can set it, for example on HDFS. Because
// current user can delete the path, we assume the user/group is correct or not an issue.
fs.mkdirs(path)
if (!ignorePermissionAcl) {
optPermission.foreach { permission =>
try {
fs.setPermission(path, permission)
} catch {
case NonFatal(e) =>
throw new SecurityException(
s"Failed to set original permission $permission back to " +
s"the created path: $path. Exception: ${e.getMessage}")
}
}
optAcls.foreach { acls =>
try {
fs.setAcl(path, acls)
} catch {
case NonFatal(e) =>
throw new SecurityException(
s"Failed to set original ACL $acls back to " +
s"the created path: $path. Exception: ${e.getMessage}")
}
}
}
} catch {
case NonFatal(e) =>
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import java.io.{File, PrintWriter}
import java.net.URI
import java.util.Locale

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{Path, RawLocalFileSystem}
import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, AclStatus, FsAction, FsPermission}
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
Expand Down Expand Up @@ -1935,6 +1936,60 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}

test("SPARK-30312: truncate table - keep acl/permission") {
import testImplicits._
val ignorePermissionAcl = Seq(true, false)

ignorePermissionAcl.foreach { ignore =>
withSQLConf(
"fs.file.impl" -> classOf[FakeLocalFsFileSystem].getName,
"fs.file.impl.disable.cache" -> "true",
SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL.key -> ignore.toString) {
withTable("tab1") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
checkAnswer(spark.table("tab1"), Row(1))

val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)

val hadoopConf = spark.sessionState.newHadoopConf()
val fs = tablePath.getFileSystem(hadoopConf)
val fileStatus = fs.getFileStatus(tablePath);

fs.setPermission(tablePath, new FsPermission("777"))
assert(fileStatus.getPermission().toString() == "rwxrwxrwx")

// Set ACL to table path.
val customAcl = new java.util.ArrayList[AclEntry]()
customAcl.add(new AclEntry.Builder()
.setType(AclEntryType.USER)
.setScope(AclEntryScope.ACCESS)
.setPermission(FsAction.READ).build())
fs.setAcl(tablePath, customAcl)
assert(fs.getAclStatus(tablePath).getEntries().get(0) == customAcl.get(0))

sql("TRUNCATE TABLE tab1")
assert(spark.table("tab1").collect().isEmpty)

val fileStatus2 = fs.getFileStatus(tablePath)
if (ignore) {
assert(fileStatus2.getPermission().toString() == "rwxr-xr-x")
} else {
assert(fileStatus2.getPermission().toString() == "rwxrwxrwx")
}
val aclEntries = fs.getAclStatus(tablePath).getEntries()
if (ignore) {
assert(aclEntries.size() == 0)
} else {
assert(aclEntries.size() == 1)
assert(aclEntries.get(0) == customAcl.get(0))
}
}
}
}
}

test("create temporary view with mismatched schema") {
withTable("tab1") {
spark.range(10).write.saveAsTable("tab1")
Expand Down Expand Up @@ -2752,3 +2807,25 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}

object FakeLocalFsFileSystem {
var aclStatus = new AclStatus.Builder().build()
}

// A fake test local filesystem used to test ACL. It keeps a ACL status. If deletes
// a path of this filesystem, it will clean up the ACL status. Note that for test purpose,
// it has only one ACL status for all paths.
class FakeLocalFsFileSystem extends RawLocalFileSystem {
import FakeLocalFsFileSystem._

override def delete(f: Path, recursive: Boolean): Boolean = {
aclStatus = new AclStatus.Builder().build()
super.delete(f, recursive)
}

override def getAclStatus(path: Path): AclStatus = aclStatus

override def setAcl(path: Path, aclSpec: java.util.List[AclEntry]): Unit = {
aclStatus = new AclStatus.Builder().addEntries(aclSpec).build()
}
}