Skip to content

Commit 29ebd93

Browse files
fuwhucloud-fan
authored andcommitted
[SPARK-29979][SQL] Add basic/reserved property key constants in TableCatalog and SupportsNamespaces
### What changes were proposed in this pull request? Add "comment" and "location" property key constants in TableCatalog and SupportNamespaces. And update code of implementation classes to use these constants instead of hard code. ### Why are the changes needed? Currently, some basic/reserved keys (eg. "location", "comment") of table and namespace properties are hard coded or defined in specific logical plan implementation class. These keys can be centralized in TableCatalog and SupportsNamespaces interface and shared across different implementation classes. ### Does this PR introduce any user-facing change? no ### How was this patch tested? Existing unit test Closes #26617 from fuwhu/SPARK-29979. Authored-by: fuwhu <bestwwg@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent f09c1a3 commit 29ebd93

File tree

12 files changed

+71
-47
lines changed

12 files changed

+71
-47
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
2222
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
2323

24+
import java.util.Arrays;
25+
import java.util.List;
2426
import java.util.Map;
2527

2628
/**
@@ -39,6 +41,23 @@
3941
@Experimental
4042
public interface SupportsNamespaces extends CatalogPlugin {
4143

44+
/**
45+
* A property to specify the location of the namespace. If the namespace
46+
* needs to store files, it should be under this location.
47+
*/
48+
String PROP_LOCATION = "location";
49+
50+
/**
51+
* A property to specify the description of the namespace. The description
52+
* will be returned in the result of "DESCRIBE NAMESPACE" command.
53+
*/
54+
String PROP_COMMENT = "comment";
55+
56+
/**
57+
* The list of reserved namespace properties.
58+
*/
59+
List<String> RESERVED_PROPERTIES = Arrays.asList(PROP_COMMENT, PROP_LOCATION);
60+
4261
/**
4362
* Return a default namespace for the catalog.
4463
* <p>

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,18 @@
3737
*/
3838
@Experimental
3939
public interface TableCatalog extends CatalogPlugin {
40+
41+
/**
42+
* A property to specify the location of the table. The files of the table
43+
* should be under this location.
44+
*/
45+
String PROP_LOCATION = "location";
46+
47+
/**
48+
* A property to specify the description of the table.
49+
*/
50+
String PROP_COMMENT = "comment";
51+
4052
/**
4153
* List the tables in a namespace from the catalog.
4254
* <p>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
2020
import org.apache.spark.sql.AnalysisException
2121
import org.apache.spark.sql.catalyst.plans.logical._
2222
import org.apache.spark.sql.catalyst.rules.Rule
23-
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange}
23+
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange}
2424

2525
/**
2626
* Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements
@@ -78,7 +78,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
7878
throw new AnalysisException(
7979
"ALTER TABLE SET LOCATION does not support partition for v2 tables.")
8080
}
81-
val changes = Seq(TableChange.setProperty("location", newLoc))
81+
val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, newLoc))
8282
createAlterTable(nameParts, catalog, tableName, changes)
8383

8484
case AlterViewSetPropertiesStatement(
@@ -97,8 +97,8 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
9797
AlterNamespaceSetProperties(catalog.asNamespaceCatalog, nameParts, properties)
9898

9999
case AlterNamespaceSetLocationStatement(NonSessionCatalog(catalog, nameParts), location) =>
100-
AlterNamespaceSetProperties(
101-
catalog.asNamespaceCatalog, nameParts, Map("location" -> location))
100+
AlterNamespaceSetProperties(catalog.asNamespaceCatalog, nameParts,
101+
Map(SupportsNamespaces.PROP_LOCATION -> location))
102102

103103
case RenameTableStatement(NonSessionCatalog(catalog, oldName), newNameParts, isView) =>
104104
if (isView) {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
3939
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
4040
import org.apache.spark.sql.catalyst.util.IntervalUtils
4141
import org.apache.spark.sql.catalyst.util.IntervalUtils.IntervalUnit
42+
import org.apache.spark.sql.connector.catalog.SupportsNamespaces
4243
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
4344
import org.apache.spark.sql.internal.SQLConf
4445
import org.apache.spark.sql.types._
@@ -2492,10 +2493,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
24922493
.map(visitPropertyKeyValues)
24932494
.getOrElse(Map.empty)
24942495
Option(ctx.comment).map(string).map {
2495-
properties += CreateNamespaceStatement.COMMENT_PROPERTY_KEY -> _
2496+
properties += SupportsNamespaces.PROP_COMMENT -> _
24962497
}
24972498
ctx.locationSpec.asScala.headOption.map(visitLocationSpec).map {
2498-
properties += CreateNamespaceStatement.LOCATION_PROPERTY_KEY -> _
2499+
properties += SupportsNamespaces.PROP_LOCATION -> _
24992500
}
25002501

25012502
CreateNamespaceStatement(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -345,11 +345,6 @@ case class CreateNamespaceStatement(
345345
ifNotExists: Boolean,
346346
properties: Map[String, String]) extends ParsedStatement
347347

348-
object CreateNamespaceStatement {
349-
val COMMENT_PROPERTY_KEY: String = "comment"
350-
val LOCATION_PROPERTY_KEY: String = "location"
351-
}
352-
353348
/**
354349
* A DROP NAMESPACE statement, as parsed from SQL.
355350
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -245,11 +245,11 @@ private[sql] object CatalogV2Util {
245245
"you can only specify one of them.")
246246
}
247247

248-
if ((options.contains("comment") || properties.contains("comment"))
249-
&& comment.isDefined) {
248+
if ((options.contains(TableCatalog.PROP_COMMENT)
249+
|| properties.contains(TableCatalog.PROP_COMMENT)) && comment.isDefined) {
250250
throw new AnalysisException(
251-
"COMMENT and option/property 'comment' are both used to set the table comment, you can " +
252-
"only specify one of them.")
251+
s"COMMENT and option/property '${TableCatalog.PROP_COMMENT}' " +
252+
s"are both used to set the table comment, you can only specify one of them.")
253253
}
254254

255255
if (options.contains("provider") || properties.contains("provider")) {
@@ -267,8 +267,9 @@ private[sql] object CatalogV2Util {
267267

268268
// convert USING, LOCATION, and COMMENT clauses to table properties
269269
tableProperties += ("provider" -> provider)
270-
comment.map(text => tableProperties += ("comment" -> text))
271-
location.orElse(options.get("path")).map(loc => tableProperties += ("location" -> loc))
270+
comment.map(text => tableProperties += (TableCatalog.PROP_COMMENT -> text))
271+
location.orElse(options.get("path")).map(
272+
loc => tableProperties += (TableCatalog.PROP_LOCATION -> loc))
272273

273274
tableProperties.toMap
274275
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
519519

520520
def getLocationIfExists: Option[(String, String)] = {
521521
val opts = CaseInsensitiveMap(extraOptions.toMap)
522-
opts.get("path").map("location" -> _)
522+
opts.get("path").map(TableCatalog.PROP_LOCATION -> _)
523523
}
524524

525525
val command = (mode, tableOpt) match {

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20+
import scala.collection.JavaConverters._
21+
2022
import org.apache.spark.sql.{AnalysisException, SaveMode}
2123
import org.apache.spark.sql.catalyst.TableIdentifier
2224
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils}
2325
import org.apache.spark.sql.catalyst.plans.logical._
2426
import org.apache.spark.sql.catalyst.rules.Rule
25-
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table}
27+
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, SupportsNamespaces, Table, TableCatalog, TableChange, V1Table}
2628
import org.apache.spark.sql.connector.expressions.Transform
2729
import org.apache.spark.sql.execution.command._
2830
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable}
@@ -147,7 +149,7 @@ class ResolveSessionCatalog(
147149
throw new AnalysisException(
148150
"ALTER TABLE SET LOCATION does not support partition for v2 tables.")
149151
}
150-
val changes = Seq(TableChange.setProperty("location", newLoc))
152+
val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, newLoc))
151153
createAlterTable(nameParts, catalog, tableName, changes)
152154
}
153155

@@ -301,11 +303,9 @@ class ResolveSessionCatalog(
301303
s"The database name is not valid: ${nameParts.quoted}")
302304
}
303305

304-
val comment = c.properties.get(CreateNamespaceStatement.COMMENT_PROPERTY_KEY)
305-
val location = c.properties.get(CreateNamespaceStatement.LOCATION_PROPERTY_KEY)
306-
val newProperties = c.properties -
307-
CreateNamespaceStatement.COMMENT_PROPERTY_KEY -
308-
CreateNamespaceStatement.LOCATION_PROPERTY_KEY
306+
val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT)
307+
val location = c.properties.get(SupportsNamespaces.PROP_LOCATION)
308+
val newProperties = c.properties -- SupportsNamespaces.RESERVED_PROPERTIES.asScala
309309
CreateDatabaseCommand(nameParts.head, c.ifNotExists, location, comment, newProperties)
310310

311311
case d @ DropNamespaceStatement(SessionCatalog(_, nameParts), _, _) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@ import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.catalyst.encoders.RowEncoder
2525
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
2626
import org.apache.spark.sql.connector.catalog.SupportsNamespaces
27-
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.COMMENT_TABLE_PROP
28-
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.LOCATION_TABLE_PROP
29-
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.RESERVED_PROPERTIES
3027
import org.apache.spark.sql.types.StructType
3128

3229
/**
@@ -45,10 +42,12 @@ case class DescribeNamespaceExec(
4542
val metadata = catalog.loadNamespaceMetadata(ns)
4643

4744
rows += toCatalystRow("Namespace Name", ns.last)
48-
rows += toCatalystRow("Description", metadata.get(COMMENT_TABLE_PROP))
49-
rows += toCatalystRow("Location", metadata.get(LOCATION_TABLE_PROP))
45+
rows += toCatalystRow("Description", metadata.get(SupportsNamespaces.PROP_COMMENT))
46+
rows += toCatalystRow("Location", metadata.get(SupportsNamespaces.PROP_LOCATION))
5047
if (isExtended) {
51-
val properties = metadata.asScala.toSeq.filter(p => !RESERVED_PROPERTIES.contains(p._1))
48+
val properties =
49+
metadata.asScala.toSeq.filter(p =>
50+
!SupportsNamespaces.RESERVED_PROPERTIES.contains(p._1))
5251
if (properties.nonEmpty) {
5352
rows += toCatalystRow("Properties", properties.mkString("(", ",", ")"))
5453
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
8585
val (partitionColumns, maybeBucketSpec) = V2SessionCatalog.convertTransforms(partitions)
8686
val provider = properties.getOrDefault("provider", conf.defaultDataSourceName)
8787
val tableProperties = properties.asScala
88-
val location = Option(properties.get(LOCATION_TABLE_PROP))
88+
val location = Option(properties.get(TableCatalog.PROP_LOCATION))
8989
val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap)
9090
.copy(locationUri = location.map(CatalogUtils.stringToURI))
9191
val tableType = if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
@@ -100,7 +100,7 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
100100
bucketSpec = maybeBucketSpec,
101101
properties = tableProperties.toMap,
102102
tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
103-
comment = Option(properties.get(COMMENT_TABLE_PROP)))
103+
comment = Option(properties.get(TableCatalog.PROP_COMMENT)))
104104

105105
try {
106106
catalog.createTable(tableDesc, ignoreIfExists = false)
@@ -227,7 +227,8 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
227227
case Array(db) =>
228228
// validate that this catalog's reserved properties are not removed
229229
changes.foreach {
230-
case remove: RemoveProperty if RESERVED_PROPERTIES.contains(remove.property) =>
230+
case remove: RemoveProperty
231+
if SupportsNamespaces.RESERVED_PROPERTIES.contains(remove.property) =>
231232
throw new UnsupportedOperationException(
232233
s"Cannot remove reserved property: ${remove.property}")
233234
case _ =>
@@ -262,9 +263,6 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
262263
}
263264

264265
private[sql] object V2SessionCatalog {
265-
val COMMENT_TABLE_PROP: String = "comment"
266-
val LOCATION_TABLE_PROP: String = "location"
267-
val RESERVED_PROPERTIES: Set[String] = Set(COMMENT_TABLE_PROP, LOCATION_TABLE_PROP)
268266

269267
/**
270268
* Convert v2 Transforms to v1 partition columns and an optional bucket spec.
@@ -294,12 +292,12 @@ private[sql] object V2SessionCatalog {
294292
defaultLocation: Option[URI] = None): CatalogDatabase = {
295293
CatalogDatabase(
296294
name = db,
297-
description = metadata.getOrDefault(COMMENT_TABLE_PROP, ""),
298-
locationUri = Option(metadata.get(LOCATION_TABLE_PROP))
295+
description = metadata.getOrDefault(SupportsNamespaces.PROP_COMMENT, ""),
296+
locationUri = Option(metadata.get(SupportsNamespaces.PROP_LOCATION))
299297
.map(CatalogUtils.stringToURI)
300298
.orElse(defaultLocation)
301299
.getOrElse(throw new IllegalArgumentException("Missing database location")),
302-
properties = metadata.asScala.toMap -- Seq("comment", "location"))
300+
properties = metadata.asScala.toMap -- SupportsNamespaces.RESERVED_PROPERTIES.asScala)
303301
}
304302

305303
private implicit class CatalogDatabaseHelper(catalogDatabase: CatalogDatabase) {
@@ -309,8 +307,8 @@ private[sql] object V2SessionCatalog {
309307
catalogDatabase.properties.foreach {
310308
case (key, value) => metadata.put(key, value)
311309
}
312-
metadata.put(LOCATION_TABLE_PROP, catalogDatabase.locationUri.toString)
313-
metadata.put(COMMENT_TABLE_PROP, catalogDatabase.description)
310+
metadata.put(SupportsNamespaces.PROP_LOCATION, catalogDatabase.locationUri.toString)
311+
metadata.put(SupportsNamespaces.PROP_COMMENT, catalogDatabase.description)
314312

315313
metadata.asJava
316314
}

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import org.apache.spark.sql._
2424
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
2525
import org.apache.spark.sql.connector.catalog._
2626
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
27-
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
2827
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
2928
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
3029
import org.apache.spark.sql.sources.SimpleScanSource
@@ -830,7 +829,7 @@ class DataSourceV2SQLSuite
830829
sql(s"CREATE NAMESPACE testcat.test LOCATION '$path'")
831830
val metadata =
832831
catalog("testcat").asNamespaceCatalog.loadNamespaceMetadata(Array("test")).asScala
833-
val catalogPath = metadata(V2SessionCatalog.LOCATION_TABLE_PROP)
832+
val catalogPath = metadata(SupportsNamespaces.PROP_LOCATION)
834833
assert(catalogPath.equals(catalogPath))
835834
}
836835
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ import org.scalatest.BeforeAndAfter
2727
import org.apache.spark.sql.AnalysisException
2828
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
2929
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
30-
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, NamespaceChange, TableChange}
31-
import org.apache.spark.sql.internal.SQLConf
30+
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, TableChange}
3231
import org.apache.spark.sql.test.SharedSparkSession
3332
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
3433
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -757,7 +756,8 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
757756
expected: scala.collection.Map[String, String],
758757
actual: scala.collection.Map[String, String]): Unit = {
759758
// remove location and comment that are automatically added by HMS unless they are expected
760-
val toRemove = V2SessionCatalog.RESERVED_PROPERTIES.filter(expected.contains)
759+
val toRemove =
760+
SupportsNamespaces.RESERVED_PROPERTIES.asScala.filter(expected.contains)
761761
assert(expected -- toRemove === actual)
762762
}
763763

0 commit comments

Comments
 (0)