Skip to content

[SPARK-30098][SQL] Use default datasource as provider for CREATE TABLE syntax #26736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
0be3a59
change CREATE TABLE default provider
Ngone51 Dec 2, 2019
00fdf32
remove unsed StaticSQLConf
Ngone51 Dec 2, 2019
2451d2f
nit
Ngone51 Dec 2, 2019
870900c
rename config
Ngone51 Dec 2, 2019
70fd3df
resolve conflict
Ngone51 Dec 2, 2019
4177654
fix DDLParserSuite
Ngone51 Dec 3, 2019
944336b
fix SparkSqlParserSuite.create table - schema
Ngone51 Dec 3, 2019
8f91010
fix InMemoryCatalogedDDLSuite
Ngone51 Dec 3, 2019
bd0134e
fix HiveCompatibilitySuite
Ngone51 Dec 3, 2019
0c05d4d
fix CachedTableSuite
Ngone51 Dec 3, 2019
7b0aeef
fix HiveDDLSuite
Ngone51 Dec 3, 2019
fabe532
fix HiveExplainSuite
Ngone51 Dec 3, 2019
d9c9932
fix HiveQuerySuite
Ngone51 Dec 3, 2019
dbffd31
fix HiveResolutionSuite
Ngone51 Dec 3, 2019
f254e15
fix HiveSerDeSuite
Ngone51 Dec 3, 2019
82e0e44
fix HiveTableScanSuite
Ngone51 Dec 3, 2019
d9a0387
fix HiveTypeCoercionSuite
Ngone51 Dec 3, 2019
fd6af36
fix HiveUDFSuite
Ngone51 Dec 3, 2019
86954d8
fix PruningSuite
Ngone51 Dec 3, 2019
939d1d7
fix SQLQuerySuite
Ngone51 Dec 3, 2019
3b29064
fix HiveCatalogedDDLSuite
Ngone51 Dec 3, 2019
b75d57f
fix UDFSuite.scala
Ngone51 Dec 3, 2019
bc45423
fix TestHiveSuite
Ngone51 Dec 3, 2019
f8a7543
fix StatisticsSuite
Ngone51 Dec 3, 2019
72e941d
fix InsertSuite
Ngone51 Dec 3, 2019
662eecd
fix HiveUserDefinedTypeSuite
Ngone51 Dec 3, 2019
d0b1a30
fix HiveParquetSuite
Ngone51 Dec 3, 2019
f1b1add
fix HiveMetastoreCatalogSuite
Ngone51 Dec 3, 2019
0fc1298
fix DataSourceWithHiveMetastoreCatalogSuite
Ngone51 Dec 3, 2019
e97ee90
fix ~HiveThriftServer2Suites
Ngone51 Dec 3, 2019
e4c088a
fix ~HiveThriftHttpServerSuite
Ngone51 Dec 3, 2019
01c9903
fix ErrorPositionSuite
Ngone51 Dec 3, 2019
2fd5691
fix ~HiveShowCreateTableSuite
Ngone51 Dec 3, 2019
5942eb2
fix pgSQl/create_view_sql q4
Ngone51 Dec 3, 2019
2b6deb0
fix WindowQuerySuite
Ngone51 Dec 3, 2019
5a1f55a
fix HiveWindowFunctionQuerySuite
Ngone51 Dec 3, 2019
dc90038
remove my todo
Ngone51 Dec 3, 2019
b5263ce
reword error message for EXTERNAL
Ngone51 Dec 4, 2019
9255bec
update postgreSQL/create_view.sql query 4/5/6
Ngone51 Dec 4, 2019
5002b27
improve createTableClauses
Ngone51 Dec 4, 2019
ebc65d5
update visitReplaceTable
Ngone51 Dec 4, 2019
2a2a0ef
update create table tests in command.DDLParserSuite
Ngone51 Dec 4, 2019
e564b37
fix CliSuite
Ngone51 Dec 4, 2019
1e20936
fix HiveThriftServer2Suites
Ngone51 Dec 4, 2019
248d2e7
improve DDLParserSuite CATS #3
Ngone51 Dec 4, 2019
124d5e4
make sure TestHive create hive table
Ngone51 Dec 5, 2019
078495e
revert legacy mode
Ngone51 Dec 5, 2019
7bb2811
revert to partitioned columns
Ngone51 Dec 5, 2019
368c777
avoid harcoded conf name
Ngone51 Dec 5, 2019
4b17927
fix HiveCommandSuite
Ngone51 Dec 5, 2019
d46d6db
resolve conflict
Ngone51 Dec 5, 2019
fb4a186
fix CreateTableAsSelectSuite
Ngone51 Dec 5, 2019
cae3571
fix and revert
Ngone51 Dec 5, 2019
fc9b910
another check
Ngone51 Dec 5, 2019
d4b53e6
fix HiveShowCreateTableSuite
Ngone51 Dec 6, 2019
35175b0
nit indent
Ngone51 Dec 6, 2019
a201307
show default behaviour
Ngone51 Dec 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ license: |
</td>
</tr>
</table>

- Since Spark 3.0, CREATE TABLE without a specific provider will use the value of `spark.sql.sources.default` as its provider. In Spark version 2.4 and earlier, it was hive. To restore the behavior before Spark 3.0, you can set `spark.sql.legacy.createHiveTableByDefault.enabled` to `true`.

## Upgrading from Spark SQL 2.4 to 2.4.1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ grammar SqlBase;
*/
public boolean legacy_exponent_literal_as_decimal_enabled = false;

/**
* When false, CREATE TABLE syntax without a provider will use
* the value of spark.sql.sources.default as its provider.
*/
public boolean legacy_create_hive_table_by_default_enabled = false;

/**
* Verify whether current token is a valid decimal token (which contains dot).
* Returns true if the character that follows the token is not a digit or letter or underscore.
Expand Down Expand Up @@ -101,13 +107,13 @@ statement
(RESTRICT | CASCADE)? #dropNamespace
| SHOW (DATABASES | NAMESPACES) ((FROM | IN) multipartIdentifier)?
(LIKE? pattern=STRING)? #showNamespaces
| createTableHeader ('(' colTypeList ')')? tableProvider
((OPTIONS options=tablePropertyList) |
(PARTITIONED BY partitioning=transformList) |
bucketSpec |
locationSpec |
(COMMENT comment=STRING) |
(TBLPROPERTIES tableProps=tablePropertyList))*
| {!legacy_create_hive_table_by_default_enabled}?
createTableHeader ('(' colTypeList ')')? tableProvider?
createTableClauses
(AS? query)? #createTable
| {legacy_create_hive_table_by_default_enabled}?
createTableHeader ('(' colTypeList ')')? tableProvider
createTableClauses
(AS? query)? #createTable
| createTableHeader ('(' columns=colTypeList ')')?
((COMMENT comment=STRING) |
Expand All @@ -128,12 +134,7 @@ statement
locationSpec |
(TBLPROPERTIES tableProps=tablePropertyList))* #createTableLike
| replaceTableHeader ('(' colTypeList ')')? tableProvider
((OPTIONS options=tablePropertyList) |
(PARTITIONED BY partitioning=transformList) |
bucketSpec |
locationSpec |
(COMMENT comment=STRING) |
(TBLPROPERTIES tableProps=tablePropertyList))*
createTableClauses
(AS? query)? #replaceTable
| ANALYZE TABLE multipartIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze
Expand Down Expand Up @@ -352,6 +353,15 @@ tableProvider
: USING multipartIdentifier
;

createTableClauses
:((OPTIONS options=tablePropertyList) |
(PARTITIONED BY partitioning=transformList) |
bucketSpec |
locationSpec |
(COMMENT comment=STRING) |
(TBLPROPERTIES tableProps=tablePropertyList))*
;

tablePropertyList
: '(' tableProperty (',' tableProperty)* ')'
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2372,6 +2372,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
*/
type TableHeader = (Seq[String], Boolean, Boolean, Boolean)

/**
* Type to keep track of table clauses:
* (partitioning, bucketSpec, options, locationSpec, properties, comment).
*/
type TableClauses = (Seq[Transform], Option[BucketSpec], Map[String, String],
Map[String, String], Option[String], Option[String])

/**
* Validate a create table statement and return the [[TableIdentifier]].
*/
Expand Down Expand Up @@ -2607,6 +2614,24 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
ctx.EXTENDED != null)
}

override def visitCreateTableClauses(ctx: CreateTableClausesContext): TableClauses = {
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

val partitioning: Seq[Transform] =
Option(ctx.partitioning).map(visitTransformList).getOrElse(Nil)
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
val comment = Option(ctx.comment).map(string)
(partitioning, bucketSpec, properties, options, location, comment)
}

/**
* Create a table, returning a [[CreateTableStatement]] logical plan.
*
Expand All @@ -2632,26 +2657,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
if (external) {
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
operationNotAllowed("CREATE EXTERNAL TABLE ...", ctx)
}

checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

val schema = Option(ctx.colTypeList()).map(createSchema)
val partitioning: Seq[Transform] =
Option(ctx.partitioning).map(visitTransformList).getOrElse(Nil)
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)

val provider = ctx.tableProvider.multipartIdentifier.getText
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
val comment = Option(ctx.comment).map(string)
val defaultProvider = conf.defaultDataSourceName
val provider =
Option(ctx.tableProvider).map(_.multipartIdentifier.getText).getOrElse(defaultProvider)
val (partitioning, bucketSpec, properties, options, location, comment) =
visitCreateTableClauses(ctx.createTableClauses())

Option(ctx.query).map(plan) match {
case Some(_) if temp =>
Expand Down Expand Up @@ -2706,23 +2719,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
operationNotAllowed("REPLACE EXTERNAL TABLE ... USING", ctx)
}

checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

val (partitioning, bucketSpec, properties, options, location, comment) =
visitCreateTableClauses(ctx.createTableClauses())
val schema = Option(ctx.colTypeList()).map(createSchema)
val partitioning: Seq[Transform] =
Option(ctx.partitioning).map(visitTransformList).getOrElse(Nil)
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)

val provider = ctx.tableProvider.multipartIdentifier.getText
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
val comment = Option(ctx.comment).map(string)
val orCreate = ctx.replaceTableHeader().CREATE() != null

Option(ctx.query).map(plan) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Log
lexer.addErrorListener(ParseErrorListener)
lexer.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced
lexer.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
lexer.legacy_create_hive_table_by_default_enabled = conf.createHiveTableByDefaultEnabled
lexer.SQL_standard_keyword_behavior = SQLStandardKeywordBehavior

val tokenStream = new CommonTokenStream(lexer)
Expand All @@ -110,6 +111,7 @@ abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Log
parser.addErrorListener(ParseErrorListener)
parser.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced
parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
parser.legacy_create_hive_table_by_default_enabled = conf.createHiveTableByDefaultEnabled
parser.SQL_standard_keyword_behavior = SQLStandardKeywordBehavior

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1961,6 +1961,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED =
buildConf("spark.sql.legacy.createHiveTableByDefault.enabled")
.internal()
.doc("When set to true, CREATE TABLE syntax without a provider will use hive " +
s"instead of the value of ${DEFAULT_DATA_SOURCE_NAME.key}.")
.booleanConf
.createWithDefault(false)


val LEGACY_INTEGRALDIVIDE_RETURN_LONG = buildConf("spark.sql.legacy.integralDivide.returnBigint")
.doc("If it is set to true, the div operator returns always a bigint. This behavior was " +
"inherited from Hive. Otherwise, the return type is the data type of the operands.")
Expand Down Expand Up @@ -2578,6 +2587,9 @@ class SQLConf extends Serializable with Logging {
def exponentLiteralAsDecimalEnabled: Boolean =
getConf(SQLConf.LEGACY_EXPONENT_LITERAL_AS_DECIMAL_ENABLED)

def createHiveTableByDefaultEnabled: Boolean =
getConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED)

def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG)

def nameNonStructGroupingKeyAsValue: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -48,6 +49,26 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(parsePlan(sql), expected, checkAnalysis = false)
}

test("SPARK-30098: create table without provider should " +
"use default data source under non-legacy mode") {
val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING)"
val defaultProvider = conf.defaultDataSourceName
val expectedPlan = CreateTableStatement(
Seq("my_tab"),
new StructType()
.add("a", IntegerType, nullable = true, "test")
.add("b", StringType),
Seq.empty[Transform],
None,
Map.empty[String, String],
defaultProvider,
Map.empty[String, String],
None,
None,
false)
parseCompare(createSql, expectedPlan)
}

test("create/replace table using - schema") {
val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
val replaceSql = "REPLACE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,22 +187,15 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
if (external) {
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
}

checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

if (ifNotExists) {
// Unlike CREATE TEMPORARY VIEW USING, CREATE TEMPORARY TABLE USING does not support
// IF NOT EXISTS. Users are not allowed to replace the existing temp table.
operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
}

val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
val provider = ctx.tableProvider.multipartIdentifier.getText
val (_, _, _, options, _, _) = visitCreateTableClauses(ctx.createTableClauses())
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText).getOrElse(
throw new ParseException("CREATE TEMPORARY TABLE without a provider is not allowed.", ctx))
val schema = Option(ctx.colTypeList()).map(createSchema)

logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ CREATE TABLE view_base_table (key int /* PRIMARY KEY */, data varchar(20))
-- !query 4 schema
struct<>
-- !query 4 output
org.apache.spark.sql.AnalysisException
Hive support is required to CREATE Hive TABLE (AS SELECT);



-- !query 5
Expand All @@ -57,7 +56,7 @@ CREATE VIEW key_dependent_view AS
struct<>
-- !query 5 output
org.apache.spark.sql.AnalysisException
Table or view not found: view_base_table; line 2 pos 17
expression 'default.view_base_table.`data`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;


-- !query 6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class SparkSqlParserSuite extends AnalysisTest {
}

test("create table - schema") {
assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING)",
assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) STORED AS textfile",
createTable(
table = "my_tab",
schema = (new StructType)
Expand All @@ -179,7 +179,8 @@ class SparkSqlParserSuite extends AnalysisTest {
partitionColumnNames = Seq("c", "d")
)
)
assertEqual("CREATE TABLE my_tab(id BIGINT, nested STRUCT<col1: STRING,col2: INT>)",
assertEqual("CREATE TABLE my_tab(id BIGINT, nested STRUCT<col1: STRING,col2: INT>) " +
"STORED AS textfile",
createTable(
table = "my_tab",
schema = (new StructType)
Expand Down
Loading