-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-22934] [SQL] Make optional clauses order insensitive for CREATE TABLE SQL statement #20133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -383,23 +383,34 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { | |
* {{{ | ||
* CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name | ||
* USING table_provider | ||
* [OPTIONS table_property_list] | ||
* [PARTITIONED BY (col_name, col_name, ...)] | ||
* [CLUSTERED BY (col_name, col_name, ...) | ||
* [SORTED BY (col_name [ASC|DESC], ...)] | ||
* INTO num_buckets BUCKETS | ||
* ] | ||
* [LOCATION path] | ||
* [COMMENT table_comment] | ||
* [TBLPROPERTIES (property_name=property_value, ...)] | ||
* create_table_clauses | ||
* [[AS] select_statement]; | ||
* | ||
* create_table_clauses (order insensitive): | ||
* [OPTIONS table_property_list] | ||
* [PARTITIONED BY (col_name, col_name, ...)] | ||
* [CLUSTERED BY (col_name, col_name, ...) | ||
* [SORTED BY (col_name [ASC|DESC], ...)] | ||
* INTO num_buckets BUCKETS | ||
* ] | ||
* [LOCATION path] | ||
* [COMMENT table_comment] | ||
* [TBLPROPERTIES (property_name=property_value, ...)] | ||
* }}} | ||
*/ | ||
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { | ||
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) | ||
if (external) { | ||
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx) | ||
} | ||
|
||
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx) | ||
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx) | ||
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx) | ||
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx) | ||
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx) | ||
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) | ||
|
||
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) | ||
val provider = ctx.tableProvider.qualifiedName.getText | ||
val schema = Option(ctx.colTypeList()).map(createSchema) | ||
|
@@ -408,9 +419,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { | |
.map(visitIdentifierList(_).toArray) | ||
.getOrElse(Array.empty[String]) | ||
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) | ||
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) | ||
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec) | ||
|
||
val location = Option(ctx.locationSpec).map(visitLocationSpec) | ||
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec) | ||
val storage = DataSource.buildStorageFormatFromOptions(options) | ||
|
||
if (location.isDefined && storage.locationUri.isDefined) { | ||
|
@@ -1087,13 +1098,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { | |
* {{{ | ||
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name | ||
* [(col1[:] data_type [COMMENT col_comment], ...)] | ||
* [COMMENT table_comment] | ||
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)] | ||
* [ROW FORMAT row_format] | ||
* [STORED AS file_format] | ||
* [LOCATION path] | ||
* [TBLPROPERTIES (property_name=property_value, ...)] | ||
* create_table_clauses | ||
* [AS select_statement]; | ||
* | ||
* create_table_clauses (order insensitive): | ||
* [COMMENT table_comment] | ||
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)] | ||
* [ROW FORMAT row_format] | ||
* [STORED AS file_format] | ||
* [LOCATION path] | ||
* [TBLPROPERTIES (property_name=property_value, ...)] | ||
* }}} | ||
*/ | ||
override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = withOrigin(ctx) { | ||
|
@@ -1104,28 +1118,36 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { | |
"CREATE TEMPORARY TABLE is not supported yet. " + | ||
"Please use CREATE TEMPORARY VIEW as an alternative.", ctx) | ||
} | ||
if (ctx.skewSpec != null) { | ||
if (ctx.skewSpec.size > 0) { | ||
operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx) | ||
} | ||
|
||
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx) | ||
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx) | ||
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx) | ||
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx) | ||
checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx) | ||
checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx) | ||
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) | ||
|
||
val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil) | ||
val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil) | ||
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) | ||
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) | ||
val selectQuery = Option(ctx.query).map(plan) | ||
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) | ||
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec) | ||
|
||
// Note: Hive requires partition columns to be distinct from the schema, so we need | ||
// to include the partition columns here explicitly | ||
val schema = StructType(dataCols ++ partitionCols) | ||
|
||
// Storage format | ||
val defaultStorage = HiveSerDe.getDefaultStorage(conf) | ||
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) | ||
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) | ||
validateRowFormatFileFormat(ctx.rowFormat.asScala, ctx.createFileFormat.asScala, ctx) | ||
val fileStorage = ctx.createFileFormat.asScala.headOption.map(visitCreateFileFormat) | ||
.getOrElse(CatalogStorageFormat.empty) | ||
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat) | ||
val rowStorage = ctx.rowFormat.asScala.headOption.map(visitRowFormat) | ||
.getOrElse(CatalogStorageFormat.empty) | ||
val location = Option(ctx.locationSpec).map(visitLocationSpec) | ||
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec) | ||
// If we are creating an EXTERNAL table, then the LOCATION field is required | ||
if (external && location.isEmpty) { | ||
operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx) | ||
|
@@ -1180,7 +1202,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { | |
ctx) | ||
} | ||
|
||
val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null) | ||
val hasStorageProperties = (ctx.createFileFormat.size != 0) || (ctx.rowFormat.size != 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure |
||
if (conf.convertCTAS && !hasStorageProperties) { | ||
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties | ||
// are empty Maps. | ||
|
@@ -1366,6 +1388,15 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { | |
} | ||
} | ||
|
||
private def validateRowFormatFileFormat( | ||
rowFormatCtx: Seq[RowFormatContext], | ||
createFileFormatCtx: Seq[CreateFileFormatContext], | ||
parentCtx: ParserRuleContext): Unit = { | ||
if (rowFormatCtx.size == 1 && createFileFormatCtx.size == 1) { | ||
validateRowFormatFileFormat(rowFormatCtx.head, createFileFormatCtx.head, parentCtx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we just combine this method and the old There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do it in a follow-up PR |
||
} | ||
} | ||
|
||
/** | ||
* Create or replace a view. This creates a [[CreateViewCommand]] command. | ||
* | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the meaning of
ctx.tableProps
now? the union of all TABLE PROPERTY list?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last one, if we have multiple clauses. However, we blocks this in the above checks.