Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Support OPTIMIZE tbl FULL for clustered table #3793

Merged
merged 10 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ statement
(clusterBySpec | CLUSTER BY NONE) #alterTableClusterBy
| ALTER TABLE table=qualifiedName
(ALTER | CHANGE) COLUMN? column=qualifiedName SYNC IDENTITY #alterTableSyncIdentity
| OPTIMIZE (path=STRING | table=qualifiedName)
| OPTIMIZE (path=STRING | table=qualifiedName) FULL?
(WHERE partitionPredicate=predicateToken)?
(zorderSpec)? #optimizeTable
| REORG TABLE table=qualifiedName
Expand Down Expand Up @@ -237,7 +237,7 @@ nonReserved
: VACUUM | USING | INVENTORY | RETAIN | HOURS | DRY | RUN
| CONVERT | TO | DELTA | PARTITIONED | BY
| DESC | DESCRIBE | LIMIT | DETAIL
| GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE
| GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE | FULL
| IDENTITY | SYNC | COLUMN | CHANGE
| REORG | APPLY | PURGE | UPGRADE | UNIFORM | ICEBERG_COMPAT_VERSION
| RESTORE | AS | OF
Expand Down Expand Up @@ -275,6 +275,7 @@ EXISTS: 'EXISTS';
FALSE: 'FALSE';
FEATURE: 'FEATURE';
FOR: 'FOR';
FULL: 'FULL';
GENERATE: 'GENERATE';
HISTORY: 'HISTORY';
HOURS: 'HOURS';
Expand Down
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1950,6 +1950,12 @@
],
"sqlState" : "0AKDC"
},
"DELTA_OPTIMIZE_FULL_NOT_SUPPORTED" : {
"message" : [
"OPTIMIZE FULL is only supported for clustered tables with non-empty clustering columns."
],
"sqlState" : "42601"
},
"DELTA_OVERWRITE_SCHEMA_WITH_DYNAMIC_PARTITION_OVERWRITE" : {
"message" : [
"'overwriteSchema' cannot be used in dynamic partition overwrite mode."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
OptimizeTableCommand(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq)(interleaveBy)
Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq,
DeltaOptimizeContext(isFull = ctx.FULL != null))(interleaveBy)
dabao521 marked this conversation as resolved.
Show resolved Hide resolved
dabao521 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3419,6 +3419,12 @@ trait DeltaErrorsBase
messageParameters = Array(s"${zOrderBy.map(_.name).mkString(", ")}"))
}

def optimizeFullNotSupportedException(): Throwable = {
new DeltaUnsupportedOperationException(
errorClass = "DELTA_OPTIMIZE_FULL_NOT_SUPPORTED",
messageParameters = Array.empty)
}

def alterClusterByNotOnDeltaTableException(): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_ONLY_OPERATION",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,10 @@ object OptimizeTableCommand {
/**
* The `optimize` command implementation for Spark SQL. Example SQL:
* {{{
* OPTIMIZE ('/path/to/dir' | delta.table) [WHERE part = 25];
* OPTIMIZE ('/path/to/dir' | delta.table) [WHERE part = 25] [FULL];
* }}}
*
* Note FULL and WHERE clauses are set exclusively.
*/
case class OptimizeTableCommand(
override val child: LogicalPlan,
Expand All @@ -151,7 +153,8 @@ case class OptimizeTableCommand(
throw DeltaErrors.notADeltaTableException(table.deltaLog.dataPath.toString)
}

if (ClusteredTableUtils.isSupported(snapshot.protocol)) {
val isClusteredTable = ClusteredTableUtils.isSupported(snapshot.protocol)
if (isClusteredTable) {
if (userPartitionPredicates.nonEmpty) {
throw DeltaErrors.clusteringWithPartitionPredicatesException(userPartitionPredicates)
}
Expand All @@ -160,6 +163,11 @@ case class OptimizeTableCommand(
}
}

lazy val clusteringColumns = ClusteringColumnInfo.extractLogicalNames(snapshot)
if (optimizeContext.isFull && (!isClusteredTable || clusteringColumns.isEmpty)) {
throw DeltaErrors.optimizeFullNotSupportedException()
}

val partitionColumns = snapshot.metadata.partitionColumns
// Parse the predicate expression into Catalyst expression and verify only simple filters
// on partition columns are present
Expand Down Expand Up @@ -199,12 +207,14 @@ case class OptimizeTableCommand(
* this threshold will be rewritten by the OPTIMIZE command. If not
* specified, [[DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO]]
* will be used. This parameter must be set to `0` when [[reorg]] is set.
* @param isFull whether OPTIMIZE FULL is run. This is only for clustered tables.
*/
case class DeltaOptimizeContext(
reorg: Option[DeltaReorgOperation] = None,
minFileSize: Option[Long] = None,
maxFileSize: Option[Long] = None,
maxDeletedRowsRatio: Option[Double] = None) {
maxDeletedRowsRatio: Option[Double] = None,
isFull: Boolean = false) {
if (reorg.nonEmpty) {
require(
minFileSize.contains(0L) && maxDeletedRowsRatio.contains(0d),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ object OptimizeTableStrategy {
zOrderBy: Seq[String]): OptimizeTableStrategy = getMode(snapshot, zOrderBy) match {
case OptimizeTableMode.CLUSTERING =>
ClusteringStrategy(
sparkSession, ClusteringColumnInfo.extractLogicalNames(snapshot))
sparkSession, ClusteringColumnInfo.extractLogicalNames(snapshot), optimizeContext)
case OptimizeTableMode.ZORDER => ZOrderStrategy(sparkSession, zOrderBy)
case OptimizeTableMode.COMPACTION =>
CompactionStrategy(sparkSession, optimizeContext)
Expand Down Expand Up @@ -188,7 +188,8 @@ case class ZOrderStrategy(
/** Implements clustering strategy for clustered tables */
case class ClusteringStrategy(
override val sparkSession: SparkSession,
clusteringColumns: Seq[String]) extends OptimizeTableStrategy {
clusteringColumns: Seq[String],
optimizeContext: DeltaOptimizeContext) extends OptimizeTableStrategy {

override val optimizeTableMode: OptimizeTableMode.Value = OptimizeTableMode.CLUSTERING

Expand Down Expand Up @@ -237,39 +238,58 @@ case class ClusteringStrategy(
* clustering. The requirements to pick candidate files are:
*
* 1. Candidate files are either un-clustered (missing clusteringProvider) or the
* clusteringProvider is "liquid".
* 2. Clustered files (clusteringProvider is set) with different clustering columns are skipped.
* When clustering columns are changed, existing clustered data is not re-clustered.
* clusteringProvider is "liquid" when isFull is unset.
* 2. Clustered files with different clustering columns are handled differently based
* on isFull setting: If isFull is unset, existing clustered files with different columns are
* skipped. If isFull is set, all files are considered.
* 3. Files that belong to the partial ZCubes are picked. A ZCube is considered as a partial
* ZCube if its size is smaller than [[DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE]].
* 4. If there is only single ZCUBE with all files are clustered and if all clustered files
* belong to that ZCube, all files are filtered out.
*/
private def applyMinZCube(files: Seq[AddFile]): Seq[AddFile] = {
val targetSize = sparkSession.sessionState.conf.getConf(DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE)
// Skip files with from different clusteringProviders or files clustered by a different set
// of clustering columns.
val inputFiles = files.iterator.filter { file =>
clusteringStatsCollector.inputStats.updateStats(file)
// Keep all files if isFull is set, otherwise skip files with different clusteringProviders
// or files clustered by a different set of clustering columns.
val (candidateFiles, skippedClusteredFiles) = files.iterator.map { f =>
// Note that updateStats is moved out of Iterator.partition lambda since
// scala2.13 doesn't call the lambda in the order of files which violates
// the updateStats' requirement which requires files are ordered in the
// ZCUBE id (files have been ordered before calling applyMinZCube).
clusteringStatsCollector.inputStats.updateStats(f)
f
}.partition { file =>
val sameOrMissingClusteringProvider =
file.clusteringProvider.forall(_ == ClusteredTableUtils.clusteringProvider)

// If clustered before, remove those with different clustering columns.
val zCubeInfo = ZCubeInfo.getForFile(file)
val unmatchedClusteringColumns = zCubeInfo.exists(_.zOrderBy != clusteringColumns)
sameOrMissingClusteringProvider && !unmatchedClusteringColumns
}.map(AddFileWithNumRecords.createFromFile)
}
// Skip files that belong to a ZCUBE that is larger than target ZCUBE size.
val smallZCubeFiles = ZCube.filterOutLargeZCubes(inputFiles, targetSize)

// Skip smallZCubeFiles if they all belong to a single ZCUBE.
ZCube.filterOutSingleZCubes(smallZCubeFiles).map { file =>
clusteringStatsCollector.outputStats.updateStats(file.addFile)
file.addFile
}.toSeq
// Note that ZCube.filterOutLargeZCubes requires clustered files have
// the same clustering columns, so skippedClusteredFiles are not included.
val smallZCubeFiles = ZCube.filterOutLargeZCubes(
candidateFiles.map(AddFileWithNumRecords.createFromFile), targetSize)

if (optimizeContext.isFull && skippedClusteredFiles.nonEmpty) {
dabao521 marked this conversation as resolved.
Show resolved Hide resolved
// Clustered files with different clustering columns have to be re-clustered.
val finalFiles = (smallZCubeFiles.map(_.addFile) ++ skippedClusteredFiles).toSeq
finalFiles.map { f =>
clusteringStatsCollector.outputStats.updateStats(f)
f
}
} else {
// Skip smallZCubeFiles if they all belong to a single ZCUBE.
ZCube.filterOutSingleZCubes(smallZCubeFiles).map { file =>
clusteringStatsCollector.outputStats.updateStats(file.addFile)
file.addFile
}.toSeq
}
}

/** Metrics for clustering when [[isClusteredTable]] is true. */
private val clusteringStatsCollector: ClusteringStatsCollector =
ClusteringStatsCollector(clusteringColumns)
ClusteringStatsCollector(clusteringColumns, optimizeContext)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import org.apache.spark.sql.delta.zorder.ZCubeInfo.ZCubeID
* calling updateStats on every new file seen.
* The number of ZCubes, number of files from matching cubes and number of unoptimized files are
* captured here.
*
* @param zOrderBy zOrder or clustering columns.
* @param isFull whether OPTIMIZE FULL is run. This is only for clustered tables.
*/
class ZCubeFileStatsCollector(zOrderBy: Seq[String]) {
class ZCubeFileStatsCollector(zOrderBy: Seq[String], isFull: Boolean) {

/** map that holds the file statistics Map("element" -> (number of files, total file size)) */
private var processedZCube: ZCubeID = _
Expand All @@ -47,7 +50,9 @@ class ZCubeFileStatsCollector(zOrderBy: Seq[String]) {
/** method to update the zCubeFileStats incrementally by file */
def updateStats(file: AddFile): AddFile = {
val zCubeInfo = ZCubeInfo.getForFile(file)
if (zCubeInfo.isDefined && zCubeInfo.get.zOrderBy == zOrderBy) {
// Note that clustered files with different clustering columns are considered candidate
// files when isFull is set.
if (zCubeInfo.isDefined && (isFull || zCubeInfo.get.zOrderBy == zOrderBy)) {
if (processedZCube != zCubeInfo.get.zCubeID) {
processedZCube = zCubeInfo.get.zCubeID
numZCubes += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ package org.apache.spark.sql.delta.commands.optimize
class ZOrderMetrics(zOrderBy: Seq[String]) {

var strategyName: String = _
val inputStats = new ZCubeFileStatsCollector(zOrderBy)
val outputStats = new ZCubeFileStatsCollector(zOrderBy)
val inputStats = new ZCubeFileStatsCollector(zOrderBy, isFull = false)
val outputStats = new ZCubeFileStatsCollector(zOrderBy, isFull = false)
var numOutputCubes = 0

def getZOrderStats(): ZOrderStats = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.spark.sql.delta.skipping.clustering

import org.apache.spark.sql.delta.commands.DeltaOptimizeContext
import org.apache.spark.sql.delta.commands.optimize.ZCubeFileStatsCollector

/**
Expand Down Expand Up @@ -54,10 +55,10 @@ case class ClusteringStats(
/**
* A class help collecting ClusteringStats.
*/
case class ClusteringStatsCollector(zOrderBy: Seq[String]) {
case class ClusteringStatsCollector(zOrderBy: Seq[String], optimizeContext: DeltaOptimizeContext) {

val inputStats = new ZCubeFileStatsCollector(zOrderBy)
val outputStats = new ZCubeFileStatsCollector(zOrderBy)
val inputStats = new ZCubeFileStatsCollector(zOrderBy, optimizeContext.isFull)
val outputStats = new ZCubeFileStatsCollector(zOrderBy, optimizeContext.isFull)
var numOutputZCubes = 0

def getClusteringStats: ClusteringStats = {
Expand Down
38 changes: 33 additions & 5 deletions spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterByTransform
import org.apache.spark.sql.delta.CloneTableSQLTestUtils
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
import org.apache.spark.sql.delta.{UnresolvedPathBasedDeltaTable, UnresolvedPathBasedTable}
import org.apache.spark.sql.delta.commands.{DescribeDeltaDetailCommand, DescribeDeltaHistory, OptimizeTableCommand, DeltaReorgTable}
import org.apache.spark.sql.delta.commands.{DeltaOptimizeContext, DescribeDeltaDetailCommand, DescribeDeltaHistory, OptimizeTableCommand, DeltaReorgTable}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.{TableIdentifier, TimeTravel}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedTable}
Expand Down Expand Up @@ -148,6 +148,31 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
assert(parser.parsePlan("OPTIMIZE tbl WHERE part = 1 ZORDER BY (col1, col2.subcol)") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"))(
Seq(unresolvedAttr("col1"), unresolvedAttr("col2", "subcol"))))

// Validate OPTIMIZE works correctly with FULL keyword.
parsedCmd = parser.parsePlan("OPTIMIZE tbl FULL")
assert(parsedCmd ===
OptimizeTableCommand(None, Some(tblId("tbl")), Nil, DeltaOptimizeContext(isFull = true))(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedTable(Seq("tbl"), "OPTIMIZE"))

parsedCmd = parser.parsePlan("OPTIMIZE catalog_foo.db.tbl FULL")
assert(parsedCmd === OptimizeTableCommand(
None, Some(tblId("tbl", "db", "catalog_foo")), Nil, DeltaOptimizeContext(isFull = true))(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedTable(Seq("catalog_foo", "db", "tbl"), "OPTIMIZE"))

parsedCmd = parser.parsePlan("OPTIMIZE '/path/to/tbl' FULL")
assert(parsedCmd === OptimizeTableCommand(
Some("/path/to/tbl"), None, Nil, DeltaOptimizeContext(isFull = true))(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedPathBasedDeltaTable("/path/to/tbl", Map.empty, "OPTIMIZE"))

parsedCmd = parser.parsePlan("OPTIMIZE delta.`/path/to/tbl` FULL")
assert(parsedCmd === OptimizeTableCommand(
None, Some(tblId("/path/to/tbl", "delta")), Nil, DeltaOptimizeContext(isFull = true))(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedTable(Seq("delta", "/path/to/tbl"), "OPTIMIZE"))
}

test("OPTIMIZE command new tokens are non-reserved keywords") {
Expand All @@ -161,15 +186,18 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
assert(parser.parsePlan("OPTIMIZE zorder") ===
OptimizeTableCommand(None, Some(tblId("zorder")), Nil)(Nil))

assert(parser.parsePlan("OPTIMIZE full") ===
OptimizeTableCommand(None, Some(tblId("full")), Nil)(Nil))

// Use the new keywords in column name
assert(parser.parsePlan("OPTIMIZE tbl WHERE zorder = 1 and optimize = 2") ===
assert(parser.parsePlan("OPTIMIZE tbl WHERE zorder = 1 and optimize = 2 and full = 3") ===
OptimizeTableCommand(None,
Some(tblId("tbl"))
, Seq("zorder = 1 and optimize = 2"))(Nil))
, Seq("zorder = 1 and optimize = 2 and full = 3"))(Nil))

assert(parser.parsePlan("OPTIMIZE tbl ZORDER BY (optimize, zorder)") ===
assert(parser.parsePlan("OPTIMIZE tbl ZORDER BY (optimize, zorder, full)") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Nil)(
Seq(unresolvedAttr("optimize"), unresolvedAttr("zorder"))))
Seq(unresolvedAttr("optimize"), unresolvedAttr("zorder"), unresolvedAttr("full"))))
}

test("DESCRIBE DETAIL command is parsed as expected") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ trait ClusteredTableTestUtilsBase
verifyDescribeHistoryOperationParameters(table)
}

/**
* Runs optimize full on the table and calls postHook on the metrics.
*
* @param table the name of table
* @param postHook callback triggered with OptimizeMetrics returned by the OPTIMIZE command
*/
def runOptimizeFull(table: String)(postHook: OptimizeMetrics => Unit): Unit = {
postHook(sql(s"OPTIMIZE $table FULL").select($"metrics.*").as[OptimizeMetrics].head())

// Verify Delta history operation parameters' clusterBy
verifyDescribeHistoryOperationParameters(table)
}

def verifyClusteringColumnsInDomainMetadata(
snapshot: Snapshot,
logicalColumnNames: Seq[String]): Unit = {
Expand Down
Loading
Loading