Skip to content

Commit c128ac5

Browse files
imback82cloud-fan
authored andcommitted
[SPARK-29511][SQL] DataSourceV2: Support CREATE NAMESPACE
### What changes were proposed in this pull request? This PR adds `CREATE NAMESPACE` support for V2 catalogs. ### Why are the changes needed? Currently, you cannot explicitly create namespaces for v2 catalogs. ### Does this PR introduce any user-facing change? The user can now perform the following: ```SQL CREATE NAMESPACE mycatalog.ns ``` to create a namespace `ns` inside `mycatalog` V2 catalog. ### How was this patch tested? Added unit tests. Closes #26166 from imback82/create_namespace. Authored-by: Terry Kim <yuminkim@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent e674909 commit c128ac5

File tree

14 files changed

+259
-91
lines changed

14 files changed

+259
-91
lines changed

docs/sql-keywords.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ Below is a list of all the keywords in Spark SQL.
210210
<tr><td>PRECEDING</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
211211
<tr><td>PRIMARY</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
212212
<tr><td>PRINCIPALS</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
213+
<tr><td>PROPERTIES</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
213214
<tr><td>PURGE</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
214215
<tr><td>QUERY</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
215216
<tr><td>RANGE</td><td>non-reserved</td><td>non-reserved</td><td>reserved</td></tr>

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,10 @@ statement
8383
: query #statementDefault
8484
| ctes? dmlStatementNoWith #dmlStatement
8585
| USE NAMESPACE? multipartIdentifier #use
86-
| CREATE database (IF NOT EXISTS)? db=errorCapturingIdentifier
86+
| CREATE (database | NAMESPACE) (IF NOT EXISTS)? multipartIdentifier
8787
((COMMENT comment=STRING) |
8888
locationSpec |
89-
(WITH DBPROPERTIES tablePropertyList))* #createDatabase
89+
(WITH (DBPROPERTIES | PROPERTIES) tablePropertyList))* #createNamespace
9090
| ALTER database db=errorCapturingIdentifier
9191
SET DBPROPERTIES tablePropertyList #setDatabaseProperties
9292
| ALTER database db=errorCapturingIdentifier
@@ -1039,6 +1039,7 @@ ansiNonReserved
10391039
| POSITION
10401040
| PRECEDING
10411041
| PRINCIPALS
1042+
| PROPERTIES
10421043
| PURGE
10431044
| QUERY
10441045
| RANGE
@@ -1299,6 +1300,7 @@ nonReserved
12991300
| PRECEDING
13001301
| PRIMARY
13011302
| PRINCIPALS
1303+
| PROPERTIES
13021304
| PURGE
13031305
| QUERY
13041306
| RANGE
@@ -1564,6 +1566,7 @@ POSITION: 'POSITION';
15641566
PRECEDING: 'PRECEDING';
15651567
PRIMARY: 'PRIMARY';
15661568
PRINCIPALS: 'PRINCIPALS';
1569+
PROPERTIES: 'PROPERTIES';
15671570
PURGE: 'PURGE';
15681571
QUERY: 'QUERY';
15691572
RANGE: 'RANGE';

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,13 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
168168
s"Can not specify catalog `${catalog.name}` for view ${viewName.quoted} " +
169169
s"because view support in catalog has not been implemented yet")
170170

171+
case c @ CreateNamespaceStatement(NonSessionCatalog(catalog, nameParts), _, _) =>
172+
CreateNamespace(
173+
catalog.asNamespaceCatalog,
174+
nameParts,
175+
c.ifNotExists,
176+
c.properties)
177+
171178
case ShowNamespacesStatement(Some(CatalogAndNamespace(catalog, namespace)), pattern) =>
172179
ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern)
173180

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2307,6 +2307,46 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
23072307
}
23082308
}
23092309

2310+
/**
2311+
* Create a [[CreateNamespaceStatement]] command.
2312+
*
2313+
* For example:
2314+
* {{{
2315+
* CREATE NAMESPACE [IF NOT EXISTS] ns1.ns2.ns3
2316+
* create_namespace_clauses;
2317+
*
2318+
* create_namespace_clauses (order insensitive):
2319+
* [COMMENT namespace_comment]
2320+
* [LOCATION path]
2321+
* [WITH PROPERTIES (key1=val1, key2=val2, ...)]
2322+
* }}}
2323+
*/
2324+
override def visitCreateNamespace(ctx: CreateNamespaceContext): LogicalPlan = withOrigin(ctx) {
2325+
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
2326+
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
2327+
checkDuplicateClauses(ctx.PROPERTIES, "WITH PROPERTIES", ctx)
2328+
checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx)
2329+
2330+
if (!ctx.PROPERTIES.isEmpty && !ctx.DBPROPERTIES.isEmpty) {
2331+
throw new ParseException(s"Either PROPERTIES or DBPROPERTIES is allowed.", ctx)
2332+
}
2333+
2334+
var properties = ctx.tablePropertyList.asScala.headOption
2335+
.map(visitPropertyKeyValues)
2336+
.getOrElse(Map.empty)
2337+
Option(ctx.comment).map(string).map {
2338+
properties += CreateNamespaceStatement.COMMENT_PROPERTY_KEY -> _
2339+
}
2340+
ctx.locationSpec.asScala.headOption.map(visitLocationSpec).map {
2341+
properties += CreateNamespaceStatement.LOCATION_PROPERTY_KEY -> _
2342+
}
2343+
2344+
CreateNamespaceStatement(
2345+
visitMultipartIdentifier(ctx.multipartIdentifier),
2346+
ctx.EXISTS != null,
2347+
properties)
2348+
}
2349+
23102350
/**
23112351
* Create a [[ShowNamespacesStatement]] command.
23122352
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,19 @@ case class InsertIntoStatement(
282282
case class ShowTablesStatement(namespace: Option[Seq[String]], pattern: Option[String])
283283
extends ParsedStatement
284284

285+
/**
286+
* A CREATE NAMESPACE statement, as parsed from SQL.
287+
*/
288+
case class CreateNamespaceStatement(
289+
namespace: Seq[String],
290+
ifNotExists: Boolean,
291+
properties: Map[String, String]) extends ParsedStatement
292+
293+
object CreateNamespaceStatement {
294+
val COMMENT_PROPERTY_KEY: String = "comment"
295+
val LOCATION_PROPERTY_KEY: String = "location"
296+
}
297+
285298
/**
286299
* A SHOW NAMESPACES statement, as parsed from SQL.
287300
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,14 @@ case class ReplaceTableAsSelect(
237237
}
238238
}
239239

240+
/**
241+
* The logical plan of the CREATE NAMESPACE command that works for v2 catalogs.
242+
*/
243+
case class CreateNamespace(
244+
catalog: SupportsNamespaces,
245+
namespace: Seq[String],
246+
ifNotExists: Boolean,
247+
properties: Map[String, String]) extends Command
240248

241249
/**
242250
* The logical plan of the SHOW NAMESPACES command that works for v2 catalogs.

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,90 @@ class DDLParserSuite extends AnalysisTest {
845845
ShowTablesStatement(Some(Seq("tbl")), Some("*dog*")))
846846
}
847847

848+
test("create namespace -- backward compatibility with DATABASE/DBPROPERTIES") {
849+
val expected = CreateNamespaceStatement(
850+
Seq("a", "b", "c"),
851+
ifNotExists = true,
852+
Map(
853+
"a" -> "a",
854+
"b" -> "b",
855+
"c" -> "c",
856+
"comment" -> "namespace_comment",
857+
"location" -> "/home/user/db"))
858+
859+
comparePlans(
860+
parsePlan(
861+
"""
862+
|CREATE NAMESPACE IF NOT EXISTS a.b.c
863+
|WITH PROPERTIES ('a'='a', 'b'='b', 'c'='c')
864+
|COMMENT 'namespace_comment' LOCATION '/home/user/db'
865+
""".stripMargin),
866+
expected)
867+
868+
comparePlans(
869+
parsePlan(
870+
"""
871+
|CREATE DATABASE IF NOT EXISTS a.b.c
872+
|WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')
873+
|COMMENT 'namespace_comment' LOCATION '/home/user/db'
874+
""".stripMargin),
875+
expected)
876+
}
877+
878+
test("create namespace -- check duplicates") {
879+
def createDatabase(duplicateClause: String): String = {
880+
s"""
881+
|CREATE NAMESPACE IF NOT EXISTS a.b.c
882+
|$duplicateClause
883+
|$duplicateClause
884+
""".stripMargin
885+
}
886+
val sql1 = createDatabase("COMMENT 'namespace_comment'")
887+
val sql2 = createDatabase("LOCATION '/home/user/db'")
888+
val sql3 = createDatabase("WITH PROPERTIES ('a'='a', 'b'='b', 'c'='c')")
889+
val sql4 = createDatabase("WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")
890+
891+
intercept(sql1, "Found duplicate clauses: COMMENT")
892+
intercept(sql2, "Found duplicate clauses: LOCATION")
893+
intercept(sql3, "Found duplicate clauses: WITH PROPERTIES")
894+
intercept(sql4, "Found duplicate clauses: WITH DBPROPERTIES")
895+
}
896+
897+
test("create namespace - property values must be set") {
898+
assertUnsupported(
899+
sql = "CREATE NAMESPACE a.b.c WITH PROPERTIES('key_without_value', 'key_with_value'='x')",
900+
containsThesePhrases = Seq("key_without_value"))
901+
}
902+
903+
test("create namespace -- either PROPERTIES or DBPROPERTIES is allowed") {
904+
val sql =
905+
s"""
906+
|CREATE NAMESPACE IF NOT EXISTS a.b.c
907+
|WITH PROPERTIES ('a'='a', 'b'='b', 'c'='c')
908+
|WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')
909+
""".stripMargin
910+
intercept(sql, "Either PROPERTIES or DBPROPERTIES is allowed")
911+
}
912+
913+
test("create namespace - support for other types in PROPERTIES") {
914+
val sql =
915+
"""
916+
|CREATE NAMESPACE a.b.c
917+
|LOCATION '/home/user/db'
918+
|WITH PROPERTIES ('a'=1, 'b'=0.1, 'c'=TRUE)
919+
""".stripMargin
920+
comparePlans(
921+
parsePlan(sql),
922+
CreateNamespaceStatement(
923+
Seq("a", "b", "c"),
924+
ifNotExists = false,
925+
Map(
926+
"a" -> "1",
927+
"b" -> "0.1",
928+
"c" -> "true",
929+
"location" -> "/home/user/db")))
930+
}
931+
848932
test("show databases: basic") {
849933
comparePlans(
850934
parsePlan("SHOW DATABASES"),

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class ParserUtilsSuite extends SparkFunSuite {
5050
|WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')
5151
""".stripMargin
5252
) { parser =>
53-
parser.statement().asInstanceOf[CreateDatabaseContext]
53+
parser.statement().asInstanceOf[CreateNamespaceContext]
5454
}
5555

5656
val emptyContext = buildContext("") { parser =>

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
2424
import org.apache.spark.sql.catalyst.rules.Rule
2525
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table}
2626
import org.apache.spark.sql.connector.expressions.Transform
27-
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand}
27+
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand}
2828
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
2929
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
3030
import org.apache.spark.sql.internal.SQLConf
@@ -255,6 +255,19 @@ class ResolveSessionCatalog(
255255
case DropViewStatement(SessionCatalog(catalog, viewName), ifExists) =>
256256
DropTableCommand(viewName.asTableIdentifier, ifExists, isView = true, purge = false)
257257

258+
case c @ CreateNamespaceStatement(SessionCatalog(catalog, nameParts), _, _) =>
259+
if (nameParts.length != 1) {
260+
throw new AnalysisException(
261+
s"The database name is not valid: ${nameParts.quoted}")
262+
}
263+
264+
val comment = c.properties.get(CreateNamespaceStatement.COMMENT_PROPERTY_KEY)
265+
val location = c.properties.get(CreateNamespaceStatement.LOCATION_PROPERTY_KEY)
266+
val newProperties = c.properties -
267+
CreateNamespaceStatement.COMMENT_PROPERTY_KEY -
268+
CreateNamespaceStatement.LOCATION_PROPERTY_KEY
269+
CreateDatabaseCommand(nameParts.head, c.ifNotExists, location, comment, newProperties)
270+
258271
case ShowTablesStatement(Some(SessionCatalog(catalog, nameParts)), pattern) =>
259272
if (nameParts.length != 1) {
260273
throw new AnalysisException(

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -329,33 +329,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
329329
)
330330
}
331331

332-
/**
333-
* Create a [[CreateDatabaseCommand]] command.
334-
*
335-
* For example:
336-
* {{{
337-
* CREATE DATABASE [IF NOT EXISTS] database_name
338-
* create_database_clauses;
339-
*
340-
* create_database_clauses (order insensitive):
341-
* [COMMENT database_comment]
342-
* [LOCATION path]
343-
* [WITH DBPROPERTIES (key1=val1, key2=val2, ...)]
344-
* }}}
345-
*/
346-
override def visitCreateDatabase(ctx: CreateDatabaseContext): LogicalPlan = withOrigin(ctx) {
347-
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
348-
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
349-
checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx)
350-
351-
CreateDatabaseCommand(
352-
ctx.db.getText,
353-
ctx.EXISTS != null,
354-
ctx.locationSpec.asScala.headOption.map(visitLocationSpec),
355-
Option(ctx.comment).map(string),
356-
ctx.tablePropertyList.asScala.headOption.map(visitPropertyKeyValues).getOrElse(Map.empty))
357-
}
358-
359332
/**
360333
* Create an [[AlterDatabasePropertiesCommand]] command.
361334
*

0 commit comments

Comments
 (0)