Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Support OPTIMIZE tbl FULL for clustered table #3793

Merged
merged 10 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix up
  • Loading branch information
dabao521 committed Oct 21, 2024
commit 15bca66100c0eed75542bce17cb2290af2882aaa
4 changes: 2 additions & 2 deletions spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
OptimizeTableCommand(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
Option(ctx.partitionPredicate).map(extractRawText(_),
ctx.FULL != null).toSeq)(interleaveBy)
Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq,
isFull = ctx.FULL != null)(interleaveBy)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class DeltaOptimizeBuilder private(table: DeltaTableV2) extends AnalysisHelper {
}
val resolvedTable = ResolvedTable.create(catalog, id, table)
val optimize = OptimizeTableCommand(
resolvedTable, partitionFilter, DeltaOptimizeContext())(zOrderBy = zOrderBy)
resolvedTable, partitionFilter, DeltaOptimizeContext(), isFull = false)(zOrderBy = zOrderBy)
toDataset(sparkSession, optimize)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ case class DeltaReorgTableCommand(
optimizeContext = DeltaOptimizeContext(
reorg = Some(reorgOperation),
minFileSize = Some(0L),
maxDeletedRowsRatio = Some(0d))
maxDeletedRowsRatio = Some(0d)),
isFull = false
)(zOrderBy = Nil)
command.run(sparkSession)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,17 @@ object OptimizeTableCommand {
*
* Note that the returned OptimizeTableCommand will have an *unresolved* child table
* and hence, the command needs to be analyzed before it can be executed.
* TODO: isFull
*
* Note isFull is only used for clustered table. When set, it indicates that all data
* within the table is eligible for clustering, including data that are clustered based
* on different sets of clustering columns.
*/
def apply(
path: Option[String],
tableIdentifier: Option[TableIdentifier],
userPartitionPredicates: Seq[String],
optimizeContext: DeltaOptimizeContext = DeltaOptimizeContext(),
isFull: Boolean)(
isFull: Boolean,
optimizeContext: DeltaOptimizeContext = DeltaOptimizeContext())(
zOrderBy: Seq[UnresolvedAttribute]): OptimizeTableCommand = {
val plan = UnresolvedDeltaPathOrIdentifier(path, tableIdentifier, "OPTIMIZE")
OptimizeTableCommand(plan, userPartitionPredicates, optimizeContext, isFull)(zOrderBy)
Expand All @@ -130,10 +133,10 @@ object OptimizeTableCommand {
/**
* The `optimize` command implementation for Spark SQL. Example SQL:
* {{{
* OPTIMIZE ('/path/to/dir' | delta.table) [WHERE part = 25];
* OPTIMIZE ('/path/to/dir' | delta.table) [WHERE part = 25] [FULL];
* }}}
*
* TODO: isFull
* Note FULL and WHERE clauses are set exclusively.
*/
case class OptimizeTableCommand(
override val child: LogicalPlan,
Expand Down
44 changes: 22 additions & 22 deletions spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,73 +86,73 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
val parser = new DeltaSqlParser(null)
var parsedCmd = parser.parsePlan("OPTIMIZE tbl")
assert(parsedCmd ===
OptimizeTableCommand(None, Some(tblId("tbl")), Nil)(Nil))
OptimizeTableCommand(None, Some(tblId("tbl")), Nil, isFull = false)(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedTable(Seq("tbl"), "OPTIMIZE"))

parsedCmd = parser.parsePlan("OPTIMIZE db.tbl")
assert(parsedCmd ===
OptimizeTableCommand(None, Some(tblId("tbl", "db")), Nil)(Nil))
OptimizeTableCommand(None, Some(tblId("tbl", "db")), Nil, isFull = false)(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedTable(Seq("db", "tbl"), "OPTIMIZE"))

parsedCmd = parser.parsePlan("OPTIMIZE catalog_foo.db.tbl")
assert(parsedCmd ===
OptimizeTableCommand(None, Some(tblId("tbl", "db", "catalog_foo")), Nil)(Nil))
OptimizeTableCommand(None, Some(tblId("tbl", "db", "catalog_foo")), Nil, isFull = false)(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedTable(Seq("catalog_foo", "db", "tbl"), "OPTIMIZE"))

assert(parser.parsePlan("OPTIMIZE tbl_${system:spark.testing}") ===
OptimizeTableCommand(None, Some(tblId("tbl_true")), Nil)(Nil))
OptimizeTableCommand(None, Some(tblId("tbl_true")), Nil, isFull = false)(Nil))

withSQLConf("tbl_var" -> "tbl") {
assert(parser.parsePlan("OPTIMIZE ${tbl_var}") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Nil)(Nil))
OptimizeTableCommand(None, Some(tblId("tbl")), Nil, isFull = false)(Nil))

assert(parser.parsePlan("OPTIMIZE ${spark:tbl_var}") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Nil)(Nil))
OptimizeTableCommand(None, Some(tblId("tbl")), Nil, isFull = false)(Nil))

assert(parser.parsePlan("OPTIMIZE ${sparkconf:tbl_var}") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Nil)(Nil))
OptimizeTableCommand(None, Some(tblId("tbl")), Nil, isFull = false)(Nil))

assert(parser.parsePlan("OPTIMIZE ${hiveconf:tbl_var}") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Nil)(Nil))
OptimizeTableCommand(None, Some(tblId("tbl")), Nil, isFull = false)(Nil))

assert(parser.parsePlan("OPTIMIZE ${hivevar:tbl_var}") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Nil)(Nil))
OptimizeTableCommand(None, Some(tblId("tbl")), Nil, isFull = false)(Nil))
}

parsedCmd = parser.parsePlan("OPTIMIZE '/path/to/tbl'")
assert(parsedCmd ===
OptimizeTableCommand(Some("/path/to/tbl"), None, Nil)(Nil))
OptimizeTableCommand(Some("/path/to/tbl"), None, Nil, isFull = false)(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedPathBasedDeltaTable("/path/to/tbl", Map.empty, "OPTIMIZE"))

parsedCmd = parser.parsePlan("OPTIMIZE delta.`/path/to/tbl`")
assert(parsedCmd ===
OptimizeTableCommand(None, Some(tblId("/path/to/tbl", "delta")), Nil)(Nil))
OptimizeTableCommand(None, Some(tblId("/path/to/tbl", "delta")), Nil, isFull = false)(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedTable(Seq("delta", "/path/to/tbl"), "OPTIMIZE"))

assert(parser.parsePlan("OPTIMIZE tbl WHERE part = 1") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"))(Nil))
OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"), isFull = false)(Nil))

assert(parser.parsePlan("OPTIMIZE tbl ZORDER BY (col1)") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Nil)
OptimizeTableCommand(None, Some(tblId("tbl")), Nil, isFull = false)
(Seq(unresolvedAttr("col1"))))

assert(parser.parsePlan("OPTIMIZE tbl WHERE part = 1 ZORDER BY col1, col2.subcol") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"))(
OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"), isFull = false)(
Seq(unresolvedAttr("col1"), unresolvedAttr("col2", "subcol"))))

assert(parser.parsePlan("OPTIMIZE tbl WHERE part = 1 ZORDER BY (col1, col2.subcol)") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"))(
OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"), isFull = false)(
Seq(unresolvedAttr("col1"), unresolvedAttr("col2", "subcol"))))

// Validate OPTIMIZE works correctly with FULL keyword.
parsedCmd = parser.parsePlan("OPTIMIZE tbl FULL")
assert(parsedCmd ===
OptimizeTableCommand(None, Some(tblId("tbl"), isFull = true), Nil)(Nil))
OptimizeTableCommand(None, Some(tblId("tbl")), Nil, isFull = true)(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedTable(Seq("tbl"), "OPTIMIZE"))

Expand All @@ -168,7 +168,7 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
UnresolvedPathBasedDeltaTable("/path/to/tbl", Map.empty, "OPTIMIZE"))

parsedCmd = parser.parsePlan("OPTIMIZE delta.`/path/to/tbl`")
parsedCmd = parser.parsePlan("OPTIMIZE delta.`/path/to/tbl` FULL")
assert(parsedCmd ===
OptimizeTableCommand(None, Some(tblId("/path/to/tbl", "delta")), Nil, isFull = true)(Nil))
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
Expand All @@ -181,22 +181,22 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {

// Use the new keywords in table name
assert(parser.parsePlan("OPTIMIZE optimize") ===
OptimizeTableCommand(None, Some(tblId("optimize")), Nil)(Nil))
OptimizeTableCommand(None, Some(tblId("optimize")), Nil, isFull = false)(Nil))

assert(parser.parsePlan("OPTIMIZE zorder") ===
OptimizeTableCommand(None, Some(tblId("zorder")), Nil)(Nil))
OptimizeTableCommand(None, Some(tblId("zorder")), Nil, isFull = false)(Nil))

assert(parser.parsePlan("OPTIMIZE full") ===
OptimizeTableCommand(None, Some(tblId("full")), Nil)(Nil))
OptimizeTableCommand(None, Some(tblId("full")), Nil, isFull = false)(Nil))

// Use the new keywords in column name
assert(parser.parsePlan("OPTIMIZE tbl WHERE zorder = 1 and optimize = 2 and full = 3") ===
OptimizeTableCommand(None,
Some(tblId("tbl"))
, Seq("zorder = 1 and optimize = 2 and full = 3"))(Nil))
, Seq("zorder = 1 and optimize = 2 and full = 3"), isFull = false)(Nil))

assert(parser.parsePlan("OPTIMIZE tbl ZORDER BY (optimize, zorder, full)") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Nil)(
OptimizeTableCommand(None, Some(tblId("tbl")), Nil, isFull = false)(
Seq(unresolvedAttr("optimize"), unresolvedAttr("zorder"), unresolvedAttr("full"))))
}

Expand Down