Skip to content

Commit c891e02

Browse files
gatorsmileHyukjinKwon
authored andcommitted
Revert "[SPARK-32481][CORE][SQL] Support truncate table to move data to trash"
### What changes were proposed in this pull request? This reverts commit 065f173, which is not part of any released version. That is, this is an unreleased feature ### Why are the changes needed? I like the concept of Trash, but I think this PR might just resolve a very specific issue by introducing a mechanism without a proper design doc. This could make the usage more complex. I think we need to consider the big picture. Trash directory is an important concept. If we decide to introduce it, we should consider all the code paths of Spark SQL that could delete the data, instead of Truncate only. We also need to consider what is the current behavior if the underlying file system does not provide the API `Trash.moveToAppropriateTrash`. Is the exception good? How about the performance when users are using the object store instead of HDFS? Will it impact the GDPR compliance? In sum, I think we should not merge the PR #29552 without the design doc and implementation plan. That is why I reverted it before the code freeze of Spark 3.1 ### Does this PR introduce _any_ user-facing change? Reverted the original commit ### How was this patch tested? The existing tests. Closes #30463 from gatorsmile/revertSpark-32481. Authored-by: Xiao Li <gatorsmile@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
1 parent 84e7036 commit c891e02

File tree

4 files changed

+2
-119
lines changed
  • core/src/main/scala/org/apache/spark/util
  • sql
    • catalyst/src/main/scala/org/apache/spark/sql/internal
    • core/src

4 files changed

+2
-119
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import com.google.common.net.InetAddresses
5050
import org.apache.commons.codec.binary.Hex
5151
import org.apache.commons.lang3.SystemUtils
5252
import org.apache.hadoop.conf.Configuration
53-
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path, Trash}
53+
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
5454
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
5555
import org.apache.hadoop.security.UserGroupInformation
5656
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -269,29 +269,6 @@ private[spark] object Utils extends Logging {
269269
file.setExecutable(true, true)
270270
}
271271

272-
/**
273-
* Move data to trash if 'spark.sql.truncate.trash.enabled' is true, else
274-
* delete the data permanently. If move data to trash failed fallback to hard deletion.
275-
*/
276-
def moveToTrashOrDelete(
277-
fs: FileSystem,
278-
partitionPath: Path,
279-
isTrashEnabled: Boolean,
280-
hadoopConf: Configuration): Boolean = {
281-
if (isTrashEnabled) {
282-
logDebug(s"Try to move data ${partitionPath.toString} to trash")
283-
val isSuccess = Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf)
284-
if (!isSuccess) {
285-
logWarning(s"Failed to move data ${partitionPath.toString} to trash. " +
286-
"Fallback to hard deletion")
287-
return fs.delete(partitionPath, true)
288-
}
289-
isSuccess
290-
} else {
291-
fs.delete(partitionPath, true)
292-
}
293-
}
294-
295272
/**
296273
* Create a directory given the abstract pathname
297274
* @return true, if the directory is successfully created; otherwise, return false.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2913,18 +2913,6 @@ object SQLConf {
29132913
.booleanConf
29142914
.createWithDefault(false)
29152915

2916-
val TRUNCATE_TRASH_ENABLED =
2917-
buildConf("spark.sql.truncate.trash.enabled")
2918-
.doc("This configuration decides when truncating table, whether data files will be moved " +
2919-
"to trash directory or deleted permanently. The trash retention time is controlled by " +
2920-
"'fs.trash.interval', and in default, the server side configuration value takes " +
2921-
"precedence over the client-side one. Note that if 'fs.trash.interval' is non-positive, " +
2922-
"this will be a no-op and log a warning message. If the data fails to be moved to " +
2923-
"trash, Spark will turn to delete it permanently.")
2924-
.version("3.1.0")
2925-
.booleanConf
2926-
.createWithDefault(false)
2927-
29282916
val DISABLED_JDBC_CONN_PROVIDER_LIST =
29292917
buildConf("spark.sql.sources.disabledJdbcConnProviderList")
29302918
.internal()
@@ -3577,8 +3565,6 @@ class SQLConf extends Serializable with Logging {
35773565

35783566
def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)
35793567

3580-
def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED)
3581-
35823568
def disabledJdbcConnectionProviders: String = getConf(SQLConf.DISABLED_JDBC_CONN_PROVIDER_LIST)
35833569

35843570
/** ********************** SQLConf functionality methods ************ */

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
4848
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
4949
import org.apache.spark.sql.types._
5050
import org.apache.spark.sql.util.SchemaUtils
51-
import org.apache.spark.util.Utils
5251

5352
/**
5453
* A command to create a table with the same definition of the given existing table.
@@ -490,7 +489,6 @@ case class TruncateTableCommand(
490489
}
491490
val hadoopConf = spark.sessionState.newHadoopConf()
492491
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
493-
val isTrashEnabled = SQLConf.get.truncateTrashEnabled
494492
locations.foreach { location =>
495493
if (location.isDefined) {
496494
val path = new Path(location.get)
@@ -515,7 +513,7 @@ case class TruncateTableCommand(
515513
}
516514
}
517515

518-
Utils.moveToTrashOrDelete(fs, path, isTrashEnabled, hadoopConf)
516+
fs.delete(path, true)
519517

520518
// We should keep original permission/acl of the path.
521519
// For owner/group, only super-user can set it, for example on HDFS. Because

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -3104,84 +3104,6 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
31043104
assert(spark.sessionState.catalog.isRegisteredFunction(rand))
31053105
}
31063106
}
3107-
3108-
test("SPARK-32481 Move data to trash on truncate table if enabled") {
3109-
val trashIntervalKey = "fs.trash.interval"
3110-
withTable("tab1") {
3111-
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
3112-
sql("CREATE TABLE tab1 (col INT) USING parquet")
3113-
sql("INSERT INTO tab1 SELECT 1")
3114-
// scalastyle:off hadoopconfiguration
3115-
val hadoopConf = spark.sparkContext.hadoopConfiguration
3116-
// scalastyle:on hadoopconfiguration
3117-
val originalValue = hadoopConf.get(trashIntervalKey, "0")
3118-
val tablePath = new Path(spark.sessionState.catalog
3119-
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
3120-
3121-
val fs = tablePath.getFileSystem(hadoopConf)
3122-
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
3123-
val trashPath = Path.mergePaths(trashCurrent, tablePath)
3124-
assume(
3125-
fs.mkdirs(trashPath) && fs.delete(trashPath, false),
3126-
"Trash directory could not be created, skipping.")
3127-
assert(!fs.exists(trashPath))
3128-
try {
3129-
hadoopConf.set(trashIntervalKey, "5")
3130-
sql("TRUNCATE TABLE tab1")
3131-
} finally {
3132-
hadoopConf.set(trashIntervalKey, originalValue)
3133-
}
3134-
assert(fs.exists(trashPath))
3135-
fs.delete(trashPath, true)
3136-
}
3137-
}
3138-
}
3139-
3140-
test("SPARK-32481 delete data permanently on truncate table if trash interval is non-positive") {
3141-
val trashIntervalKey = "fs.trash.interval"
3142-
withTable("tab1") {
3143-
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
3144-
sql("CREATE TABLE tab1 (col INT) USING parquet")
3145-
sql("INSERT INTO tab1 SELECT 1")
3146-
// scalastyle:off hadoopconfiguration
3147-
val hadoopConf = spark.sparkContext.hadoopConfiguration
3148-
// scalastyle:on hadoopconfiguration
3149-
val originalValue = hadoopConf.get(trashIntervalKey, "0")
3150-
val tablePath = new Path(spark.sessionState.catalog
3151-
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
3152-
3153-
val fs = tablePath.getFileSystem(hadoopConf)
3154-
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
3155-
val trashPath = Path.mergePaths(trashCurrent, tablePath)
3156-
assert(!fs.exists(trashPath))
3157-
try {
3158-
hadoopConf.set(trashIntervalKey, "0")
3159-
sql("TRUNCATE TABLE tab1")
3160-
} finally {
3161-
hadoopConf.set(trashIntervalKey, originalValue)
3162-
}
3163-
assert(!fs.exists(trashPath))
3164-
}
3165-
}
3166-
}
3167-
3168-
test("SPARK-32481 Do not move data to trash on truncate table if disabled") {
3169-
withTable("tab1") {
3170-
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "false") {
3171-
sql("CREATE TABLE tab1 (col INT) USING parquet")
3172-
sql("INSERT INTO tab1 SELECT 1")
3173-
val hadoopConf = spark.sessionState.newHadoopConf()
3174-
val tablePath = new Path(spark.sessionState.catalog
3175-
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
3176-
3177-
val fs = tablePath.getFileSystem(hadoopConf)
3178-
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
3179-
val trashPath = Path.mergePaths(trashCurrent, tablePath)
3180-
sql("TRUNCATE TABLE tab1")
3181-
assert(!fs.exists(trashPath))
3182-
}
3183-
}
3184-
}
31853107
}
31863108

31873109
object FakeLocalFsFileSystem {

0 commit comments

Comments
 (0)