Skip to content

Commit 533ef2d

Browse files
zedtangilicmarkodb
authored andcommitted
[SPARK-45787][SQL] Support Catalog.listColumns for clustering columns
### What changes were proposed in this pull request? Support listColumns API for clustering columns. ### Why are the changes needed? Clustering columns should be supported, just like partition and bucket columns, for listColumns API. ### Does this PR introduce _any_ user-facing change? Yes, listColumns will now show an additional field `isCluster` to indicate whether the column is a clustering column. Old output for `spark.catalog.listColumns`: ``` +----+-----------+--------+--------+-----------+--------+ |name|description|dataType|nullable|isPartition|isBucket| +----+-----------+--------+--------+-----------+--------+ | a| null| int| true| false| false| | b| null| string| true| false| false| | c| null| int| true| false| false| | d| null| string| true| false| false| +----+-----------+--------+--------+-----------+--------+ ``` New output: ``` +----+-----------+--------+--------+-----------+--------+---------+ |name|description|dataType|nullable|isPartition|isBucket|isCluster| +----+-----------+--------+--------+-----------+--------+---------+ | a| null| int| true| false| false| false| | b| null| string| true| false| false| false| | c| null| int| true| false| false| false| | d| null| string| true| false| false| false| +----+-----------+--------+--------+-----------+--------+---------+ ``` ### How was this patch tested? New unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47451 from zedtang/list-clustering-columns. Authored-by: Jiaheng Tang <jiaheng.tang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 59f0c4a commit 533ef2d

File tree

10 files changed

+87
-28
lines changed

10 files changed

+87
-28
lines changed

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4152,7 +4152,8 @@ test_that("catalog APIs, listTables, getTable, listColumns, listFunctions, funct
41524152
c <- listColumns("cars")
41534153
expect_equal(nrow(c), 2)
41544154
expect_equal(colnames(c),
4155-
c("name", "description", "dataType", "nullable", "isPartition", "isBucket"))
4155+
c("name", "description", "dataType", "nullable", "isPartition", "isBucket",
4156+
"isCluster"))
41564157
expect_equal(collect(c)[[1]][[1]], "speed")
41574158
expect_error(listColumns("zxwtyswklpf", "default"),
41584159
"[TABLE_OR_VIEW_NOT_FOUND]*`spark_catalog`.`default`.`zxwtyswklpf`*")

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/interface.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ class Table(
153153
* whether the column is a partition column.
154154
* @param isBucket
155155
* whether the column is a bucket column.
156+
* @param isCluster
157+
* whether the column is a clustering column.
156158
* @since 3.5.0
157159
*/
158160
class Column(
@@ -161,17 +163,29 @@ class Column(
161163
val dataType: String,
162164
val nullable: Boolean,
163165
val isPartition: Boolean,
164-
val isBucket: Boolean)
166+
val isBucket: Boolean,
167+
val isCluster: Boolean)
165168
extends DefinedByConstructorParams {
166169

170+
def this(
171+
name: String,
172+
description: String,
173+
dataType: String,
174+
nullable: Boolean,
175+
isPartition: Boolean,
176+
isBucket: Boolean) = {
177+
this(name, description, dataType, nullable, isPartition, isBucket, isCluster = false)
178+
}
179+
167180
override def toString: String = {
168181
"Column[" +
169182
s"name='$name', " +
170183
Option(description).map { d => s"description='$d', " }.getOrElse("") +
171184
s"dataType='$dataType', " +
172185
s"nullable='$nullable', " +
173186
s"isPartition='$isPartition', " +
174-
s"isBucket='$isBucket']"
187+
s"isBucket='$isBucket', " +
188+
s"isCluster='$isCluster']"
175189
}
176190

177191
}

python/pyspark/sql/catalog.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class Column(NamedTuple):
6565
nullable: bool
6666
isPartition: bool
6767
isBucket: bool
68+
isCluster: bool
6869

6970

7071
class Function(NamedTuple):
@@ -663,6 +664,7 @@ def listColumns(self, tableName: str, dbName: Optional[str] = None) -> List[Colu
663664
nullable=jcolumn.nullable(),
664665
isPartition=jcolumn.isPartition(),
665666
isBucket=jcolumn.isBucket(),
667+
isCluster=jcolumn.isCluster(),
666668
)
667669
)
668670
return columns

python/pyspark/sql/connect/catalog.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ def listColumns(self, tableName: str, dbName: Optional[str] = None) -> List[Colu
208208
nullable=table[3][i].as_py(),
209209
isPartition=table[4][i].as_py(),
210210
isBucket=table[5][i].as_py(),
211+
isCluster=table[6][i].as_py(),
211212
)
212213
for i in range(table.num_rows)
213214
]

python/pyspark/sql/tests/test_catalog.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ def test_list_columns(self):
367367
nullable=True,
368368
isPartition=False,
369369
isBucket=False,
370+
isCluster=False,
370371
),
371372
)
372373
self.assertEqual(
@@ -378,6 +379,7 @@ def test_list_columns(self):
378379
nullable=True,
379380
isPartition=False,
380381
isBucket=False,
382+
isCluster=False,
381383
),
382384
)
383385
columns2 = sorted(
@@ -393,6 +395,7 @@ def test_list_columns(self):
393395
nullable=True,
394396
isPartition=False,
395397
isBucket=False,
398+
isCluster=False,
396399
),
397400
)
398401
self.assertEqual(
@@ -404,6 +407,7 @@ def test_list_columns(self):
404407
nullable=True,
405408
isPartition=False,
406409
isBucket=False,
410+
isCluster=False,
407411
),
408412
)
409413
self.assertRaisesRegex(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,10 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
229229
catalog.alterTable(tbl1.copy(properties = Map("toh" -> "frem")))
230230
val newTbl1 = catalog.getTable("db2", "tbl1")
231231
assert(!tbl1.properties.contains("toh"))
232-
assert(newTbl1.properties.size == tbl1.properties.size + 1)
232+
// clusteringColumns property is injected during newTable, so we need
233+
// to filter it out before comparing the properties.
234+
assert(newTbl1.properties.size ==
235+
tbl1.properties.filter { case (key, _) => key != "clusteringColumns" }.size + 1)
233236
assert(newTbl1.properties.get("toh") == Some("frem"))
234237
}
235238

@@ -1111,7 +1114,9 @@ abstract class CatalogTestUtils {
11111114
},
11121115
provider = Some(defaultProvider),
11131116
partitionColumnNames = Seq("a", "b"),
1114-
bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
1117+
bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)),
1118+
properties = Map(
1119+
ClusterBySpec.toPropertyWithoutValidation(ClusterBySpec.fromColumnNames(Seq("c1", "c2")))))
11151120
}
11161121

11171122
def newView(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,10 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
529529
catalog.alterTable(tbl1.copy(properties = Map("toh" -> "frem")))
530530
val newTbl1 = catalog.getTableRawMetadata(TableIdentifier("tbl1", Some("db2")))
531531
assert(!tbl1.properties.contains("toh"))
532-
assert(newTbl1.properties.size == tbl1.properties.size + 1)
532+
// clusteringColumns property is injected during newTable, so we need
533+
// to filter it out before comparing the properties.
534+
assert(newTbl1.properties.size ==
535+
tbl1.properties.filter { case (key, _) => key != "clusteringColumns" }.size + 1)
533536
assert(newTbl1.properties.get("toh") == Some("frem"))
534537
// Alter table without explicitly specifying database
535538
catalog.setCurrentDatabase("db2")

sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ class Table(
135135
* @param nullable whether the column is nullable.
136136
* @param isPartition whether the column is a partition column.
137137
* @param isBucket whether the column is a bucket column.
138+
* @param isCluster whether the column is a clustering column.
138139
* @since 2.0.0
139140
*/
140141
@Stable
@@ -144,17 +145,29 @@ class Column(
144145
val dataType: String,
145146
val nullable: Boolean,
146147
val isPartition: Boolean,
147-
val isBucket: Boolean)
148+
val isBucket: Boolean,
149+
val isCluster: Boolean)
148150
extends DefinedByConstructorParams {
149151

152+
def this(
153+
name: String,
154+
description: String,
155+
dataType: String,
156+
nullable: Boolean,
157+
isPartition: Boolean,
158+
isBucket: Boolean) = {
159+
this(name, description, dataType, nullable, isPartition, isBucket, isCluster = false)
160+
}
161+
150162
override def toString: String = {
151163
"Column[" +
152164
s"name='$name', " +
153165
Option(description).map { d => s"description='$d', " }.getOrElse("") +
154166
s"dataType='$dataType', " +
155167
s"nullable='$nullable', " +
156168
s"isPartition='$isPartition', " +
157-
s"isBucket='$isBucket']"
169+
s"isBucket='$isBucket', " +
170+
s"isCluster='$isCluster']"
158171
}
159172

160173
}

sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -394,11 +394,14 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
394394

395395
val columns = sparkSession.sessionState.executePlan(plan).analyzed match {
396396
case ResolvedTable(_, _, table, _) =>
397-
// TODO (SPARK-45787): Support clusterBySpec for listColumns().
398-
val (partitionColumnNames, bucketSpecOpt, _) =
397+
val (partitionColumnNames, bucketSpecOpt, clusterBySpecOpt) =
399398
table.partitioning.toImmutableArraySeq.convertTransforms
400399
val bucketColumnNames = bucketSpecOpt.map(_.bucketColumnNames).getOrElse(Nil)
401-
schemaToColumns(table.schema(), partitionColumnNames.contains, bucketColumnNames.contains)
400+
val clusteringColumnNames = clusterBySpecOpt.map { clusterBySpec =>
401+
clusterBySpec.columnNames.map(_.toString)
402+
}.getOrElse(Nil).toSet
403+
schemaToColumns(table.schema(), partitionColumnNames.contains, bucketColumnNames.contains,
404+
clusteringColumnNames.contains)
402405

403406
case ResolvedPersistentView(_, _, metadata) =>
404407
schemaToColumns(metadata.schema)
@@ -415,15 +418,17 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
415418
private def schemaToColumns(
416419
schema: StructType,
417420
isPartCol: String => Boolean = _ => false,
418-
isBucketCol: String => Boolean = _ => false): Seq[Column] = {
421+
isBucketCol: String => Boolean = _ => false,
422+
isClusteringCol: String => Boolean = _ => false): Seq[Column] = {
419423
schema.map { field =>
420424
new Column(
421425
name = field.name,
422426
description = field.getComment().orNull,
423427
dataType = field.dataType.simpleString,
424428
nullable = field.nullable,
425429
isPartition = isPartCol(field.name),
426-
isBucket = isBucketCol(field.name))
430+
isBucket = isBucketCol(field.name),
431+
isCluster = isClusteringCol(field.name))
427432
}
428433
}
429434

sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,16 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
108108
assert(tableMetadata.schema.nonEmpty, "bad test")
109109
assert(tableMetadata.partitionColumnNames.nonEmpty, "bad test")
110110
assert(tableMetadata.bucketSpec.isDefined, "bad test")
111+
assert(tableMetadata.clusterBySpec.isDefined, "bad test")
111112
assert(columns.collect().map(_.name).toSet == tableMetadata.schema.map(_.name).toSet)
112113
val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
114+
val clusteringColumnNames = tableMetadata.clusterBySpec.map { clusterBySpec =>
115+
clusterBySpec.columnNames.map(_.toString)
116+
}.getOrElse(Nil).toSet
113117
columns.collect().foreach { col =>
114118
assert(col.isPartition == tableMetadata.partitionColumnNames.contains(col.name))
115119
assert(col.isBucket == bucketColumnNames.contains(col.name))
120+
assert(col.isCluster == clusteringColumnNames.contains(col.name))
116121
}
117122

118123
dbName.foreach { db =>
@@ -406,37 +411,42 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
406411

407412
test("SPARK-39615: qualified name with catalog - listColumns") {
408413
val answers = Map(
409-
"col1" -> ("int", true, false, true),
410-
"col2" -> ("string", true, false, false),
411-
"a" -> ("int", true, true, false),
412-
"b" -> ("string", true, true, false)
414+
"col1" -> ("int", true, false, true, false),
415+
"col2" -> ("string", true, false, false, false),
416+
"a" -> ("int", true, true, false, false),
417+
"b" -> ("string", true, true, false, false)
413418
)
414419

415420
assert(spark.catalog.currentCatalog() === "spark_catalog")
416421
createTable("my_table1")
417422

418423
val columns1 = spark.catalog.listColumns("my_table1").collect()
419424
assert(answers ===
420-
columns1.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
425+
columns1.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket,
426+
c.isCluster)).toMap)
421427

422428
val columns2 = spark.catalog.listColumns("default.my_table1").collect()
423429
assert(answers ===
424-
columns2.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
430+
columns2.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket,
431+
c.isCluster)).toMap)
425432

426433
val columns3 = spark.catalog.listColumns("spark_catalog.default.my_table1").collect()
427434
assert(answers ===
428-
columns3.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
435+
columns3.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket,
436+
c.isCluster)).toMap)
429437

430438
createDatabase("my_db1")
431439
createTable("my_table2", Some("my_db1"))
432440

433441
val columns4 = spark.catalog.listColumns("my_db1.my_table2").collect()
434442
assert(answers ===
435-
columns4.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
443+
columns4.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket,
444+
c.isCluster)).toMap)
436445

437446
val columns5 = spark.catalog.listColumns("spark_catalog.my_db1.my_table2").collect()
438447
assert(answers ===
439-
columns5.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
448+
columns5.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket,
449+
c.isCluster)).toMap)
440450

441451
val catalogName = "testcat"
442452
val dbName = "my_db2"
@@ -476,13 +486,13 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
476486

477487
test("Column.toString") {
478488
assert(new Column("namama", "descaca", "datatapa",
479-
nullable = true, isPartition = false, isBucket = true).toString ==
489+
nullable = true, isPartition = false, isBucket = true, isCluster = false).toString ==
480490
"Column[name='namama', description='descaca', dataType='datatapa', " +
481-
"nullable='true', isPartition='false', isBucket='true']")
491+
"nullable='true', isPartition='false', isBucket='true', isCluster='false']")
482492
assert(new Column("namama", null, "datatapa",
483-
nullable = false, isPartition = true, isBucket = true).toString ==
493+
nullable = false, isPartition = true, isBucket = true, isCluster = true).toString ==
484494
"Column[name='namama', dataType='datatapa', " +
485-
"nullable='false', isPartition='true', isBucket='true']")
495+
"nullable='false', isPartition='true', isBucket='true', isCluster='true']")
486496
}
487497

488498
test("catalog classes format in Dataset.show") {
@@ -491,7 +501,8 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
491501
isTemporary = false)
492502
val function = new Function("nama", "cataloa", Array("databasa"), "descripta", "classa", false)
493503
val column = new Column(
494-
"nama", "descripta", "typa", nullable = false, isPartition = true, isBucket = true)
504+
"nama", "descripta", "typa", nullable = false, isPartition = true, isBucket = true,
505+
isCluster = true)
495506
val dbFields = getConstructorParameterValues(db)
496507
val tableFields = getConstructorParameterValues(table)
497508
val functionFields = getConstructorParameterValues(function)
@@ -503,7 +514,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
503514
assert((functionFields(0), functionFields(1), functionFields(3), functionFields(4),
504515
functionFields(5)) == ("nama", "cataloa", "descripta", "classa", false))
505516
assert(functionFields(2).asInstanceOf[Array[String]].sameElements(Array("databasa")))
506-
assert(columnFields == Seq("nama", "descripta", "typa", false, true, true))
517+
assert(columnFields == Seq("nama", "descripta", "typa", false, true, true, true))
507518
val dbString = CatalogImpl.makeDataset(Seq(db), spark).showString(10)
508519
val tableString = CatalogImpl.makeDataset(Seq(table), spark).showString(10)
509520
val functionString = CatalogImpl.makeDataset(Seq(function), spark).showString(10)

0 commit comments

Comments
 (0)