Skip to content

Commit cbe6846

Browse files
zedtangcloud-fan
authored andcommitted
[SPARK-48760][SQL] Fix CatalogV2Util.applyClusterByChanges
### What changes were proposed in this pull request? #47156 introduced a bug in `CatalogV2Util.applyClusterByChanges` that it will remove the existing `ClusterByTransform` first, regardless of whether there is a `ClusterBy` table change. This means any table change will remove the clustering columns from the table. This PR fixes the bug by removing the `ClusterByTransform` only when there is a `ClusterBy` table change. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Amend existing test to catch this bug. ### Was this patch authored or co-authored using generative AI tooling? No Closes #47288 from zedtang/fix-apply-cluster-by-changes. Authored-by: Jiaheng Tang <jiaheng.tang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 2bee9a6 commit cbe6846

File tree

2 files changed

+13
-9
lines changed

2 files changed

+13
-9
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -178,16 +178,19 @@ private[sql] object CatalogV2Util {
178178
schema: StructType,
179179
changes: Seq[TableChange]): Array[Transform] = {
180180

181-
val newPartitioning = partitioning.filterNot(_.isInstanceOf[ClusterByTransform]).toBuffer
182-
changes.foreach {
183-
case clusterBy: ClusterBy =>
184-
newPartitioning += ClusterBySpec.extractClusterByTransform(
181+
var newPartitioning = partitioning
182+
// If there is a clusterBy change (only the first one), we overwrite the existing
183+
// clustering columns.
184+
val clusterByOpt = changes.collectFirst { case c: ClusterBy => c }
185+
clusterByOpt.foreach { clusterBy =>
186+
newPartitioning = partitioning.map {
187+
case _: ClusterByTransform => ClusterBySpec.extractClusterByTransform(
185188
schema, ClusterBySpec(clusterBy.clusteringColumns.toIndexedSeq), conf.resolver)
186-
187-
case _ =>
188-
// ignore other changes
189+
case other => other
190+
}
189191
}
190-
newPartitioning.toArray
192+
193+
newPartitioning
191194
}
192195

193196
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,9 @@ trait DescribeTableSuiteBase extends QueryTest with DDLCommandTestUtils {
181181

182182
test("describe a clustered table") {
183183
withNamespaceAndTable("ns", "tbl") { tbl =>
184-
sql(s"CREATE TABLE $tbl (col1 STRING COMMENT 'this is comment', col2 struct<x:int, y:int>) " +
184+
sql(s"CREATE TABLE $tbl (col1 STRING, col2 struct<x:int, y:int>) " +
185185
s"$defaultUsing CLUSTER BY (col1, col2.x)")
186+
sql(s"ALTER TABLE $tbl ALTER COLUMN col1 COMMENT 'this is comment';")
186187
val descriptionDf = sql(s"DESC $tbl")
187188
assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq(
188189
("col_name", StringType),

0 commit comments

Comments
 (0)