Skip to content

Commit 58be82a

Browse files
Ngone51cloud-fan
authored andcommitted
[SPARK-30098][SQL] Use default datasource as provider for CREATE TABLE syntax
### What changes were proposed in this pull request? In this PR, we propose to use the value of `spark.sql.source.default` as the provider for `CREATE TABLE` syntax instead of `hive` in Spark 3.0. And to help the migration, we introduce a legacy conf `spark.sql.legacy.respectHiveDefaultProvider.enabled` and set its default to `false`. ### Why are the changes needed? 1. Currently, `CREATE TABLE` syntax use hive provider to create table while `DataFrameWriter.saveAsTable` API using the value of `spark.sql.source.default` as a provider to create table. It would be better to make them consistent. 2. User may gets confused in some cases. For example: ``` CREATE TABLE t1 (c1 INT) USING PARQUET; CREATE TABLE t2 (c1 INT); ``` In these two DDLs, use may think that `t2` should also use parquet as default provider since Spark always advertise parquet as the default format. However, it's hive in this case. On the other hand, if we omit the USING clause in a CTAS statement, we do pick parquet by default if `spark.sql.hive.convertCATS=true`: ``` CREATE TABLE t3 USING PARQUET AS SELECT 1 AS VALUE; CREATE TABLE t4 AS SELECT 1 AS VALUE; ``` And these two cases together can be really confusing. 3. Now, Spark SQL is very independent and popular. We do not need to be fully consistent with Hive's behavior. ### Does this PR introduce any user-facing change? Yes, before this PR, using `CREATE TABLE` syntax will use hive provider. But now, it use the value of `spark.sql.source.default` as its provider. ### How was this patch tested? Added tests in `DDLParserSuite` and `HiveDDlSuite`. Closes #26736 from Ngone51/dev-create-table-using-parquet-by-default. Lead-authored-by: wuyi <yi.wu@databricks.com> Co-authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent c1a5f94 commit 58be82a

File tree

25 files changed

+261
-186
lines changed

25 files changed

+261
-186
lines changed

docs/sql-migration-guide.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ license: |
253253
</td>
254254
</tr>
255255
</table>
256+
257+
- Since Spark 3.0, CREATE TABLE without a specific provider will use the value of `spark.sql.sources.default` as its provider. In Spark version 2.4 and earlier, it was hive. To restore the behavior before Spark 3.0, you can set `spark.sql.legacy.createHiveTableByDefault.enabled` to `true`.
256258

257259
- Since Spark 3.0, the unary arithmetic operator plus(`+`) only accepts string, numeric and interval type values as inputs. Besides, `+` with a integral string representation will be coerced to double value, e.g. `+'1'` results `1.0`. In Spark version 2.4 and earlier, this operator is ignored. There is no type checking for it, thus, all type values with a `+` prefix are valid, e.g. `+ array(1, 2)` is valid and results `[1, 2]`. Besides, there is no type coercion for it at all, e.g. in Spark 2.4, the result of `+'1'` is string `1`.
258260

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ grammar SqlBase;
2929
*/
3030
public boolean legacy_exponent_literal_as_decimal_enabled = false;
3131
32+
/**
33+
* When false, CREATE TABLE syntax without a provider will use
34+
* the value of spark.sql.sources.default as its provider.
35+
*/
36+
public boolean legacy_create_hive_table_by_default_enabled = false;
37+
3238
/**
3339
* Verify whether current token is a valid decimal token (which contains dot).
3440
* Returns true if the character that follows the token is not a digit or letter or underscore.
@@ -101,13 +107,13 @@ statement
101107
(RESTRICT | CASCADE)? #dropNamespace
102108
| SHOW (DATABASES | NAMESPACES) ((FROM | IN) multipartIdentifier)?
103109
(LIKE? pattern=STRING)? #showNamespaces
104-
| createTableHeader ('(' colTypeList ')')? tableProvider
105-
((OPTIONS options=tablePropertyList) |
106-
(PARTITIONED BY partitioning=transformList) |
107-
bucketSpec |
108-
locationSpec |
109-
(COMMENT comment=STRING) |
110-
(TBLPROPERTIES tableProps=tablePropertyList))*
110+
| {!legacy_create_hive_table_by_default_enabled}?
111+
createTableHeader ('(' colTypeList ')')? tableProvider?
112+
createTableClauses
113+
(AS? query)? #createTable
114+
| {legacy_create_hive_table_by_default_enabled}?
115+
createTableHeader ('(' colTypeList ')')? tableProvider
116+
createTableClauses
111117
(AS? query)? #createTable
112118
| createTableHeader ('(' columns=colTypeList ')')?
113119
((COMMENT comment=STRING) |
@@ -128,12 +134,7 @@ statement
128134
locationSpec |
129135
(TBLPROPERTIES tableProps=tablePropertyList))* #createTableLike
130136
| replaceTableHeader ('(' colTypeList ')')? tableProvider
131-
((OPTIONS options=tablePropertyList) |
132-
(PARTITIONED BY partitioning=transformList) |
133-
bucketSpec |
134-
locationSpec |
135-
(COMMENT comment=STRING) |
136-
(TBLPROPERTIES tableProps=tablePropertyList))*
137+
createTableClauses
137138
(AS? query)? #replaceTable
138139
| ANALYZE TABLE multipartIdentifier partitionSpec? COMPUTE STATISTICS
139140
(identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze
@@ -352,6 +353,15 @@ tableProvider
352353
: USING multipartIdentifier
353354
;
354355

356+
createTableClauses
357+
:((OPTIONS options=tablePropertyList) |
358+
(PARTITIONED BY partitioning=transformList) |
359+
bucketSpec |
360+
locationSpec |
361+
(COMMENT comment=STRING) |
362+
(TBLPROPERTIES tableProps=tablePropertyList))*
363+
;
364+
355365
tablePropertyList
356366
: '(' tableProperty (',' tableProperty)* ')'
357367
;

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2379,6 +2379,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
23792379
*/
23802380
type TableHeader = (Seq[String], Boolean, Boolean, Boolean)
23812381

2382+
/**
2383+
* Type to keep track of table clauses:
2384+
* (partitioning, bucketSpec, options, locationSpec, properties, comment).
2385+
*/
2386+
type TableClauses = (Seq[Transform], Option[BucketSpec], Map[String, String],
2387+
Map[String, String], Option[String], Option[String])
2388+
23822389
/**
23832390
* Validate a create table statement and return the [[TableIdentifier]].
23842391
*/
@@ -2614,6 +2621,24 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
26142621
ctx.EXTENDED != null)
26152622
}
26162623

2624+
override def visitCreateTableClauses(ctx: CreateTableClausesContext): TableClauses = {
2625+
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
2626+
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
2627+
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
2628+
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
2629+
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
2630+
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
2631+
2632+
val partitioning: Seq[Transform] =
2633+
Option(ctx.partitioning).map(visitTransformList).getOrElse(Nil)
2634+
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
2635+
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
2636+
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
2637+
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
2638+
val comment = Option(ctx.comment).map(string)
2639+
(partitioning, bucketSpec, properties, options, location, comment)
2640+
}
2641+
26172642
/**
26182643
* Create a table, returning a [[CreateTableStatement]] logical plan.
26192644
*
@@ -2639,26 +2664,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
26392664
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
26402665
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
26412666
if (external) {
2642-
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
2667+
operationNotAllowed("CREATE EXTERNAL TABLE ...", ctx)
26432668
}
2644-
2645-
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
2646-
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
2647-
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
2648-
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
2649-
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
2650-
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
2651-
26522669
val schema = Option(ctx.colTypeList()).map(createSchema)
2653-
val partitioning: Seq[Transform] =
2654-
Option(ctx.partitioning).map(visitTransformList).getOrElse(Nil)
2655-
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
2656-
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
2657-
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
2658-
2659-
val provider = ctx.tableProvider.multipartIdentifier.getText
2660-
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
2661-
val comment = Option(ctx.comment).map(string)
2670+
val defaultProvider = conf.defaultDataSourceName
2671+
val provider =
2672+
Option(ctx.tableProvider).map(_.multipartIdentifier.getText).getOrElse(defaultProvider)
2673+
val (partitioning, bucketSpec, properties, options, location, comment) =
2674+
visitCreateTableClauses(ctx.createTableClauses())
26622675

26632676
Option(ctx.query).map(plan) match {
26642677
case Some(_) if temp =>
@@ -2713,23 +2726,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
27132726
operationNotAllowed("REPLACE EXTERNAL TABLE ... USING", ctx)
27142727
}
27152728

2716-
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
2717-
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
2718-
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
2719-
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
2720-
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
2721-
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
2722-
2729+
val (partitioning, bucketSpec, properties, options, location, comment) =
2730+
visitCreateTableClauses(ctx.createTableClauses())
27232731
val schema = Option(ctx.colTypeList()).map(createSchema)
2724-
val partitioning: Seq[Transform] =
2725-
Option(ctx.partitioning).map(visitTransformList).getOrElse(Nil)
2726-
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
2727-
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
2728-
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
2729-
27302732
val provider = ctx.tableProvider.multipartIdentifier.getText
2731-
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
2732-
val comment = Option(ctx.comment).map(string)
27332733
val orCreate = ctx.replaceTableHeader().CREATE() != null
27342734

27352735
Option(ctx.query).map(plan) match {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Log
101101
lexer.addErrorListener(ParseErrorListener)
102102
lexer.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced
103103
lexer.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
104+
lexer.legacy_create_hive_table_by_default_enabled = conf.createHiveTableByDefaultEnabled
104105
lexer.SQL_standard_keyword_behavior = SQLStandardKeywordBehavior
105106

106107
val tokenStream = new CommonTokenStream(lexer)
@@ -110,6 +111,7 @@ abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Log
110111
parser.addErrorListener(ParseErrorListener)
111112
parser.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced
112113
parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
114+
parser.legacy_create_hive_table_by_default_enabled = conf.createHiveTableByDefaultEnabled
113115
parser.SQL_standard_keyword_behavior = SQLStandardKeywordBehavior
114116

115117
try {

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1966,6 +1966,15 @@ object SQLConf {
19661966
.booleanConf
19671967
.createWithDefault(false)
19681968

1969+
val LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED =
1970+
buildConf("spark.sql.legacy.createHiveTableByDefault.enabled")
1971+
.internal()
1972+
.doc("When set to true, CREATE TABLE syntax without a provider will use hive " +
1973+
s"instead of the value of ${DEFAULT_DATA_SOURCE_NAME.key}.")
1974+
.booleanConf
1975+
.createWithDefault(false)
1976+
1977+
19691978
val LEGACY_INTEGRALDIVIDE_RETURN_LONG = buildConf("spark.sql.legacy.integralDivide.returnBigint")
19701979
.doc("If it is set to true, the div operator returns always a bigint. This behavior was " +
19711980
"inherited from Hive. Otherwise, the return type is the data type of the operands.")
@@ -2583,6 +2592,9 @@ class SQLConf extends Serializable with Logging {
25832592
def exponentLiteralAsDecimalEnabled: Boolean =
25842593
getConf(SQLConf.LEGACY_EXPONENT_LITERAL_AS_DECIMAL_ENABLED)
25852594

2595+
def createHiveTableByDefaultEnabled: Boolean =
2596+
getConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED)
2597+
25862598
def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG)
25872599

25882600
def nameNonStructGroupingKeyAsValue: Boolean =

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
2525
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
28+
import org.apache.spark.sql.internal.SQLConf
2829
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
2930
import org.apache.spark.unsafe.types.UTF8String
3031

@@ -48,6 +49,26 @@ class DDLParserSuite extends AnalysisTest {
4849
comparePlans(parsePlan(sql), expected, checkAnalysis = false)
4950
}
5051

52+
test("SPARK-30098: create table without provider should " +
53+
"use default data source under non-legacy mode") {
54+
val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING)"
55+
val defaultProvider = conf.defaultDataSourceName
56+
val expectedPlan = CreateTableStatement(
57+
Seq("my_tab"),
58+
new StructType()
59+
.add("a", IntegerType, nullable = true, "test")
60+
.add("b", StringType),
61+
Seq.empty[Transform],
62+
None,
63+
Map.empty[String, String],
64+
defaultProvider,
65+
Map.empty[String, String],
66+
None,
67+
None,
68+
false)
69+
parseCompare(createSql, expectedPlan)
70+
}
71+
5172
test("create/replace table using - schema") {
5273
val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
5374
val replaceSql = "REPLACE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -187,22 +187,15 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
187187
if (external) {
188188
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
189189
}
190-
191-
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
192-
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
193-
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
194-
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
195-
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
196-
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
197-
198190
if (ifNotExists) {
199191
// Unlike CREATE TEMPORARY VIEW USING, CREATE TEMPORARY TABLE USING does not support
200192
// IF NOT EXISTS. Users are not allowed to replace the existing temp table.
201193
operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
202194
}
203195

204-
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
205-
val provider = ctx.tableProvider.multipartIdentifier.getText
196+
val (_, _, _, options, _, _) = visitCreateTableClauses(ctx.createTableClauses())
197+
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText).getOrElse(
198+
throw new ParseException("CREATE TEMPORARY TABLE without a provider is not allowed.", ctx))
206199
val schema = Option(ctx.colTypeList()).map(createSchema)
207200

208201
logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +

sql/core/src/test/resources/sql-tests/results/postgreSQL/create_view.sql.out

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@ CREATE TABLE view_base_table (key int /* PRIMARY KEY */, data varchar(20))
4646
-- !query 4 schema
4747
struct<>
4848
-- !query 4 output
49-
org.apache.spark.sql.AnalysisException
50-
Hive support is required to CREATE Hive TABLE (AS SELECT);
49+
5150

5251

5352
-- !query 5
@@ -57,7 +56,7 @@ CREATE VIEW key_dependent_view AS
5756
struct<>
5857
-- !query 5 output
5958
org.apache.spark.sql.AnalysisException
60-
Table or view not found: view_base_table; line 2 pos 17
59+
expression 'default.view_base_table.`data`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
6160

6261

6362
-- !query 6

sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ class SparkSqlParserSuite extends AnalysisTest {
159159
}
160160

161161
test("create table - schema") {
162-
assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING)",
162+
assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) STORED AS textfile",
163163
createTable(
164164
table = "my_tab",
165165
schema = (new StructType)
@@ -179,7 +179,8 @@ class SparkSqlParserSuite extends AnalysisTest {
179179
partitionColumnNames = Seq("c", "d")
180180
)
181181
)
182-
assertEqual("CREATE TABLE my_tab(id BIGINT, nested STRUCT<col1: STRING,col2: INT>)",
182+
assertEqual("CREATE TABLE my_tab(id BIGINT, nested STRUCT<col1: STRING,col2: INT>) " +
183+
"STORED AS textfile",
183184
createTable(
184185
table = "my_tab",
185186
schema = (new StructType)

0 commit comments

Comments
 (0)