Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.paimon.table.{FileStoreTable, Table}
import org.apache.paimon.types.RowType
import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
Expand All @@ -33,7 +34,7 @@ import java.util.{Map => JMap, Objects}

import scala.collection.JavaConverters._

trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement with Logging {

val table: Table

Expand All @@ -44,19 +45,42 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
private def toPaimonPartitions(rows: Array[InternalRow]): Array[java.util.Map[String, String]] = {
table match {
case fileStoreTable: FileStoreTable =>
val rowConverter = CatalystTypeConverters
.createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema))
val rowDataPartitionComputer = new InternalRowPartitionComputer(
fileStoreTable.coreOptions().partitionDefaultName(),
partitionRowType,
table.partitionKeys().asScala.toArray,
CoreOptions.fromMap(table.options()).legacyPartitionName
)
val partitionKeys = table.partitionKeys().asScala.toSeq
val partitionDefaultName = fileStoreTable.coreOptions().partitionDefaultName()
val legacyPartitionName = CoreOptions.fromMap(table.options()).legacyPartitionName

rows.map {
r =>
rowDataPartitionComputer
.generatePartValues(new SparkRow(partitionRowType, rowConverter(r).asInstanceOf[Row]))
val partitionFieldCount = r.numFields
require(
partitionFieldCount <= partitionKeys.length,
s"Partition values length $partitionFieldCount exceeds partition keys " +
s"${partitionKeys.mkString("[", ", ", "]")}."
)
val partitionNames = partitionKeys.take(partitionFieldCount)
val currentPartitionRowType =
if (partitionFieldCount == partitionRowType.getFieldCount) {
partitionRowType
} else {
TypeUtils.project(table.rowType, partitionNames.asJava)
}
val currentPartitionSchema =
if (partitionFieldCount == partitionSchema.length) {
partitionSchema
} else {
SparkTypeUtils.fromPaimonRowType(currentPartitionRowType)
}
val rowConverter = CatalystTypeConverters.createToScalaConverter(
CharVarcharUtils.replaceCharVarcharWithString(currentPartitionSchema))
val rowDataPartitionComputer = new InternalRowPartitionComputer(
partitionDefaultName,
currentPartitionRowType,
partitionNames.toArray,
legacyPartitionName
)

rowDataPartitionComputer.generatePartValues(
new SparkRow(currentPartitionRowType, rowConverter(r).asInstanceOf[Row]))
}
case _ =>
throw new UnsupportedOperationException("Only FileStoreTable supports partitions.")
Expand All @@ -67,6 +91,7 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
table match {
case fileStoreTable: FileStoreTable =>
val partitions = toPaimonPartitions(rows).toSeq.asJava
logInfo("Try to drop partitions: " + partitions.asScala.mkString(","))
val partitionModification = fileStoreTable.catalogEnvironment().partitionModification()
if (partitionModification != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,37 +35,34 @@ case class PaimonDropPartitionsExec(
refreshCache: () => Unit)
extends LeafV2CommandExec
with Logging {

override protected def run(): Seq[InternalRow] = {
val partitionSchema = table.asPartitionable.partitionSchema()

val (partialPartSpecs, fullPartSpecs) =
partSpecs.partition(_.ident.numFields != partitionSchema.length)

val (existsPartIdents, notExistsPartIdents) =
fullPartSpecs.map(_.ident).partition(table.partitionExists)
if (notExistsPartIdents.nonEmpty && !ignoreIfNotExists) {
val (existsFullPartSpecs, notExistsPartSpecs) =
fullPartSpecs.partition(spec => table.partitionExists(spec.ident))
if (notExistsPartSpecs.nonEmpty && !ignoreIfNotExists) {
throw new NoSuchPartitionsException(
table.name(),
notExistsPartIdents,
notExistsPartSpecs.map(_.ident),
table.asPartitionable.partitionSchema())
}
val allExistsPartIdents = existsPartIdents ++ partialPartSpecs.flatMap(expendPartialSpec)
logDebug("Try to drop partitions: " + allExistsPartIdents.mkString(","))
val isTableAltered = if (allExistsPartIdents.nonEmpty) {
allExistsPartIdents
.map(
partIdent => {
if (purge) table.purgePartition(partIdent) else table.dropPartition(partIdent)
})
.reduce(_ || _)
val partSpecsToDrop = existsFullPartSpecs ++ partialPartSpecs
val isTableAltered = if (partSpecsToDrop.nonEmpty) {
val partIdentsToDrop = partSpecsToDrop.map(_.ident).toArray
if (purge) {
table.purgePartitions(partIdentsToDrop)
} else {
table.dropPartitions(partIdentsToDrop)
}
} else false

if (isTableAltered) refreshCache()
Seq.empty
}

private def expendPartialSpec(partialSpec: ResolvedPartitionSpec): Seq[InternalRow] = {
table.listPartitionIdentifiers(partialSpec.names.toArray, partialSpec.ident).toSeq
}

override def output: Seq[Attribute] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,80 @@ class PaimonPartitionManagementTest extends PaimonSparkTestBase {
"SELECT 1 AS a, CAST('2024-06-01 10:30:01' AS TIMESTAMP) AS dt UNION ALL SELECT 2, CAST('2024-06-02 15:45:30' AS TIMESTAMP) ORDER BY dt")
)
}

test("Paimon Partition Management: batch drop partitions") {
withTable("T_Batch") {
spark.sql(s"""
|CREATE TABLE T_Batch (a INT, dt INT, hh STRING, mm STRING)
|using paimon
|TBLPROPERTIES ('file.format' = 'avro')
|PARTITIONED BY (dt, hh, mm)
|""".stripMargin)

spark.sql(
"INSERT INTO T_Batch VALUES " +
"(1, 20240101, '00', '00'), " +
"(2, 20240101, '00', '01'), " +
"(3, 20240101, '01', '00'), " +
"(4, 20240102, '00', '00'), " +
"(5, 20240102, '00', '01')")
checkAnswer(
spark.sql("SHOW PARTITIONS T_Batch"),
Row("dt=20240101/hh=00/mm=00") ::
Row("dt=20240101/hh=00/mm=01") ::
Row("dt=20240101/hh=01/mm=00") ::
Row("dt=20240102/hh=00/mm=00") ::
Row("dt=20240102/hh=00/mm=01") :: Nil
)

// First, drop all sub-partitions for dt=20240101 by specifying only the first-level partition column dt
spark.sql("ALTER TABLE T_Batch DROP PARTITION (dt=20240101)")
checkAnswer(
spark.sql("SHOW PARTITIONS T_Batch"),
Row("dt=20240102/hh=00/mm=00") ::
Row("dt=20240102/hh=00/mm=01") :: Nil
)

// Then, drop all sub-partitions under dt=20240102 and hh='00' by specifying the first two partition columns dt and hh
spark.sql("ALTER TABLE T_Batch DROP PARTITION (dt=20240102, hh='00')")
checkAnswer(
spark.sql("SHOW PARTITIONS T_Batch"),
Nil
)
}
}

test("Paimon Partition Management: batch drop partitions with mixed full and partial specs") {
withTable("T_Mixed_Batch") {
spark.sql(s"""
|CREATE TABLE T_Mixed_Batch (a INT, dt INT, hh STRING, mm STRING)
|using paimon
|TBLPROPERTIES ('file.format' = 'avro')
|PARTITIONED BY (dt, hh, mm)
|""".stripMargin)

spark.sql(
"INSERT INTO T_Mixed_Batch VALUES " +
"(1, 20240101, '00', '00'), " +
"(2, 20240101, '00', '01'), " +
"(3, 20240101, '01', '00'), " +
"(4, 20240102, '00', '00'), " +
"(5, 20240102, '00', '01')")

spark.sql(
"ALTER TABLE T_Mixed_Batch DROP PARTITION (dt=20240101, hh='00', mm='00'), " +
"PARTITION (dt=20240102)")

checkAnswer(
spark.sql("SELECT COUNT(*) FROM `T_Mixed_Batch$snapshots` WHERE commit_kind = 'OVERWRITE'"),
Row(1L) :: Nil
)

checkAnswer(
spark.sql("SHOW PARTITIONS T_Mixed_Batch"),
Row("dt=20240101/hh=00/mm=01") ::
Row("dt=20240101/hh=01/mm=00") :: Nil
)
}
}
}