Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException;
import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException;

Expand All @@ -33,6 +34,9 @@
* add an array of partitions and any data they contain to the table
* ${@link #dropPartitions}:
* remove an array of partitions and any data they contain from the table
* ${@link #purgePartitions}:
* remove an array of partitions and any data they contain from the table by skipping
* a trash even if it is supported
*
* @since 3.1.0
*/
Expand Down Expand Up @@ -82,4 +86,23 @@ void createPartitions(
* @return true if partitions were deleted, false if any partition not exists
*/
boolean dropPartitions(InternalRow[] idents);

/**
* Drop an array of partitions atomically from table, and completely remove partitions data
* by skipping a trash even if it is supported.
* <p>
* If any partition doesn't exists,
* the operation of purgePartitions need to be safely rolled back.
*
* @param idents an array of partition identifiers
* @return true if partitions were deleted, false if any partition not exists
* @throws NoSuchPartitionException If any partition identifier to alter doesn't exist
* @throws UnsupportedOperationException If partition purging is not supported
*
* @since 3.2.0
*/
default boolean purgePartitions(InternalRow[] idents)
throws NoSuchPartitionException, UnsupportedOperationException {
throw new UnsupportedOperationException("Partition purge is not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
* add a partition and any data it contains to the table
* ${@link #dropPartition}:
* remove a partition and any data it contains from the table
* ${@link #purgePartition}:
* remove a partition and any data it contains from the table by skipping a trash
* even if it is supported.
* ${@link #replacePartitionMetadata}:
* point a partition to a new location, which will swap one location's data for the other
*
Expand Down Expand Up @@ -72,6 +75,22 @@ void createPartition(
*/
boolean dropPartition(InternalRow ident);

/**
* Drop a partition from the table and completely remove partition data by skipping a trash
* even if it is supported.
*
* @param ident a partition identifier
* @return true if a partition was deleted, false if no partition exists for the identifier
* @throws NoSuchPartitionException If the partition identifier to alter doesn't exist
* @throws UnsupportedOperationException If partition purging is not supported
*
* @since 3.2.0
*/
default boolean purgePartition(InternalRow ident)
throws NoSuchPartitionException, UnsupportedOperationException {
throw new UnsupportedOperationException("Partition purge is not supported");
}

/**
* Test whether a partition exists using an {@link InternalRow ident} from the table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
assert(!hasPartitions(partTable))
}

test("purgePartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
partTable.createPartitions(
partIdents,
Array(new util.HashMap[String, String](), new util.HashMap[String, String]()))
val errMsg = intercept[UnsupportedOperationException] {
partTable.purgePartitions(partIdents)
}.getMessage
assert(errMsg.contains("purge is not supported"))
}

test("dropPartitions failed if partition not exists") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
assert(!hasPartitions(partTable))
}

test("purgePartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
val errMsg = intercept[UnsupportedOperationException] {
partTable.purgePartition(InternalRow.apply("3"))
}.getMessage
assert(errMsg.contains("purge is not supported"))
}

test("replacePartitionMetadata") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement
case class AlterTableDropPartitionExec(
table: SupportsPartitionManagement,
partSpecs: Seq[ResolvedPartitionSpec],
ignoreIfNotExists: Boolean) extends V2CommandExec {
ignoreIfNotExists: Boolean,
purge: Boolean) extends V2CommandExec {
import DataSourceV2Implicits._

override def output: Seq[Attribute] = Seq.empty
Expand All @@ -45,9 +46,11 @@ case class AlterTableDropPartitionExec(
existsPartIdents match {
case Seq() => // Nothing will be done
case Seq(partIdent) =>
table.dropPartition(partIdent)
if (purge) table.purgePartition(partIdent) else table.dropPartition(partIdent)
case _ if table.isInstanceOf[SupportsAtomicPartitionManagement] =>
table.asAtomicPartitionable.dropPartitions(existsPartIdents.toArray)
val idents = existsPartIdents.toArray
val atomicTable = table.asAtomicPartitionable
if (purge) atomicTable.purgePartitions(idents) else atomicTable.dropPartitions(idents)
case _ =>
throw new UnsupportedOperationException(
s"Nonatomic partition table ${table.name()} can not drop multiple partitions.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
table, parts.asResolvedPartitionSpecs, ignoreIfExists) :: Nil

case AlterTableDropPartition(
ResolvedTable(_, _, table: SupportsPartitionManagement), parts, ignoreIfNotExists, _) =>
ResolvedTable(_, _, table: SupportsPartitionManagement), parts, ignoreIfNotExists, purge) =>
AlterTableDropPartitionExec(
table, parts.asResolvedPartitionSpecs, ignoreIfNotExists) :: Nil
table, parts.asResolvedPartitionSpecs, ignoreIfNotExists, purge) :: Nil

case AlterTableRenamePartition(_: ResolvedTable, _: ResolvedPartitionSpec, _) =>
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ import org.apache.spark.sql.execution.command

trait AlterTableDropPartitionSuiteBase extends command.AlterTableDropPartitionSuiteBase {
override protected val notFullPartitionSpecErr = "The following partitions not found in table"

test("purge partition data") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
sql(s"ALTER TABLE $t ADD PARTITION (id = 1)")
checkPartitions(t, Map("id" -> "1"))
sql(s"ALTER TABLE $t DROP PARTITION (id = 1) PURGE")
checkPartitions(t) // no partitions
}
}
}

class AlterTableDropPartitionSuite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,19 @@ class AlterTableDropPartitionSuite
assert(errMsg.contains("can not alter partitions"))
}
}

test("purge partition data") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
sql(s"ALTER TABLE $t ADD PARTITION (id=1)")
try {
val errMsg = intercept[UnsupportedOperationException] {
sql(s"ALTER TABLE $t DROP PARTITION (id=1) PURGE")
}.getMessage
assert(errMsg.contains("purge is not supported"))
} finally {
sql(s"ALTER TABLE $t DROP PARTITION (id=1)")
}
}
}
}