-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-31707][SQL] Revert SPARK-30098 Use default datasource as provider for CREATE TABLE syntax #28517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31707][SQL] Revert SPARK-30098 Use default datasource as provider for CREATE TABLE syntax #28517
Changes from all commits
a641886
4f82115
4cb2cc4
8141aed
84c172f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2196,21 +2196,20 @@ class DDLParserSuite extends AnalysisTest { | |
CommentOnTable(UnresolvedTable(Seq("a", "b", "c")), "xYz")) | ||
} | ||
|
||
test("create table - without using") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. after a second thought, can we keep the test but ignore it? We still need these tests after the syntax unification is done. We can add a comment to reference the JIRA ticket. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same for other removed tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. +1 for @cloud-fan 's suggestion. (ignoring tests instead of removal). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's straightforward for removed tests - I'll do that. What about the tests being reverted as Hive create table is not integrated with DSv2? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1. Maybe ignoring tests and add TODO comments to them is better. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It cannot be simply changed to "ignore" instead of "test" as we're removing the config. I'll comment out instead. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, just remove the conf after revert? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry but could you please elaborate? Removing config is what this patch proposed, and it's a part of revert. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think he meant to remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I see. That's one of valid approaches and good idea. Maybe then we need to remove some tests which depend on the config (two tests for on and off). |
||
withSQLConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key -> "false") { | ||
val sql = "CREATE TABLE 1m.2g(a INT)" | ||
val expectedTableSpec = TableSpec( | ||
Seq("1m", "2g"), | ||
Some(new StructType().add("a", IntegerType)), | ||
Seq.empty[Transform], | ||
None, | ||
Map.empty[String, String], | ||
None, | ||
Map.empty[String, String], | ||
None, | ||
None) | ||
// TODO: ignored by SPARK-31707, restore the test after create table syntax unification | ||
ignore("create table - without using") { | ||
val sql = "CREATE TABLE 1m.2g(a INT)" | ||
val expectedTableSpec = TableSpec( | ||
Seq("1m", "2g"), | ||
Some(new StructType().add("a", IntegerType)), | ||
Seq.empty[Transform], | ||
None, | ||
Map.empty[String, String], | ||
None, | ||
Map.empty[String, String], | ||
None, | ||
None) | ||
|
||
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false) | ||
} | ||
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -256,8 +256,8 @@ class DataSourceV2SQLSuite | |
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty) | ||
} | ||
|
||
test("CreateTable: without USING clause") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It didn't exist before SPARK-30098, and this test depends on SPARK-30098. |
||
spark.conf.set(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key, "false") | ||
// TODO: ignored by SPARK-31707, restore the test after create table syntax unification | ||
ignore("CreateTable: without USING clause") { | ||
// unset this config to use the default v2 session catalog. | ||
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) | ||
val testCatalog = catalog("testcat").asTableCatalog | ||
|
@@ -681,8 +681,8 @@ class DataSourceV2SQLSuite | |
} | ||
} | ||
|
||
test("CreateTableAsSelect: without USING clause") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It didn't exist before SPARK-30098, and this test depends on SPARK-30098. |
||
spark.conf.set(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key, "false") | ||
// TODO: ignored by SPARK-31707, restore the test after create table syntax unification | ||
ignore("CreateTableAsSelect: without USING clause") { | ||
// unset this config to use the default v2 session catalog. | ||
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) | ||
val testCatalog = catalog("testcat").asTableCatalog | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -944,7 +944,7 @@ class AdaptiveQueryExecSuite | |
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", | ||
SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { | ||
withTable("t1") { | ||
val plan = sql("CREATE TABLE t1 AS SELECT 1 col").queryExecution.executedPlan | ||
val plan = sql("CREATE TABLE t1 USING parquet AS SELECT 1 col").queryExecution.executedPlan | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test fails with below error message:
This test initially assumed the create table without USING is following native syntax, and it's no longer valid. Added |
||
assert(plan.isInstanceOf[DataWritingCommandExec]) | ||
assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec]) | ||
} | ||
|
@@ -1005,7 +1005,7 @@ class AdaptiveQueryExecSuite | |
} | ||
spark.sparkContext.addSparkListener(listener) | ||
try { | ||
sql("CREATE TABLE t1 AS SELECT 1 col").collect() | ||
sql("CREATE TABLE t1 USING parquet AS SELECT 1 col").collect() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. |
||
spark.sparkContext.listenerBus.waitUntilEmpty() | ||
assert(checkDone) | ||
} finally { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,8 +40,7 @@ import org.apache.spark.sql.test.SharedSparkSession | |
import org.apache.spark.sql.types.{IntegerType, StructField, StructType} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All of the changes in this test suite are simply done via reverting back to pre-SPARK-30098. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to remove the line |
||
|
||
class DDLParserSuite extends AnalysisTest with SharedSparkSession { | ||
private lazy val parser = new SparkSqlParser(new SQLConf().copy( | ||
SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED -> false)) | ||
private lazy val parser = new SparkSqlParser(new SQLConf) | ||
|
||
private def assertUnsupported(sql: String, containsThesePhrases: Seq[String] = Seq()): Unit = { | ||
val e = intercept[ParseException] { | ||
|
@@ -76,12 +75,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { | |
}.head | ||
} | ||
|
||
private def withCreateTableStatement(sql: String)(prediction: CreateTableStatement => Unit) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is to check with DSV2 but Hive create table syntax doesn't follow the path. That said, it didn't exist before SPARK-30098. |
||
: Unit = { | ||
val statement = parser.parsePlan(sql).asInstanceOf[CreateTableStatement] | ||
prediction(statement) | ||
} | ||
|
||
test("alter database - property values must be set") { | ||
assertUnsupported( | ||
sql = "ALTER DATABASE my_db SET DBPROPERTIES('key_without_value', 'key_with_value'='x')", | ||
|
@@ -487,17 +480,21 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { | |
|
||
test("Test CTAS #3") { | ||
val s3 = """CREATE TABLE page_view AS SELECT * FROM src""" | ||
val statement = parser.parsePlan(s3).asInstanceOf[CreateTableAsSelectStatement] | ||
assert(statement.tableName(0) == "page_view") | ||
assert(statement.asSelect == parser.parsePlan("SELECT * FROM src")) | ||
assert(statement.partitioning.isEmpty) | ||
assert(statement.bucketSpec.isEmpty) | ||
assert(statement.properties.isEmpty) | ||
assert(statement.provider.isEmpty) | ||
assert(statement.options.isEmpty) | ||
assert(statement.location.isEmpty) | ||
assert(statement.comment.isEmpty) | ||
assert(!statement.ifNotExists) | ||
val (desc, exists) = extractTableDesc(s3) | ||
assert(exists == false) | ||
assert(desc.identifier.database == None) | ||
assert(desc.identifier.table == "page_view") | ||
assert(desc.tableType == CatalogTableType.MANAGED) | ||
assert(desc.storage.locationUri == None) | ||
assert(desc.schema.isEmpty) | ||
assert(desc.viewText == None) // TODO will be SQLText | ||
assert(desc.viewQueryColumnNames.isEmpty) | ||
assert(desc.storage.properties == Map()) | ||
assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) | ||
assert(desc.storage.outputFormat == | ||
Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) | ||
assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) | ||
assert(desc.properties == Map()) | ||
} | ||
|
||
test("Test CTAS #4") { | ||
|
@@ -657,60 +654,67 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { | |
|
||
test("create table - basic") { | ||
val query = "CREATE TABLE my_table (id int, name string)" | ||
withCreateTableStatement(query) { state => | ||
assert(state.tableName(0) == "my_table") | ||
assert(state.tableSchema == new StructType().add("id", "int").add("name", "string")) | ||
assert(state.partitioning.isEmpty) | ||
assert(state.bucketSpec.isEmpty) | ||
assert(state.properties.isEmpty) | ||
assert(state.provider.isEmpty) | ||
assert(state.options.isEmpty) | ||
assert(state.location.isEmpty) | ||
assert(state.comment.isEmpty) | ||
assert(!state.ifNotExists) | ||
} | ||
val (desc, allowExisting) = extractTableDesc(query) | ||
assert(!allowExisting) | ||
assert(desc.identifier.database.isEmpty) | ||
assert(desc.identifier.table == "my_table") | ||
assert(desc.tableType == CatalogTableType.MANAGED) | ||
assert(desc.schema == new StructType().add("id", "int").add("name", "string")) | ||
assert(desc.partitionColumnNames.isEmpty) | ||
assert(desc.bucketSpec.isEmpty) | ||
assert(desc.viewText.isEmpty) | ||
assert(desc.viewQueryColumnNames.isEmpty) | ||
assert(desc.storage.locationUri.isEmpty) | ||
assert(desc.storage.inputFormat == | ||
Some("org.apache.hadoop.mapred.TextInputFormat")) | ||
assert(desc.storage.outputFormat == | ||
Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) | ||
assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) | ||
assert(desc.storage.properties.isEmpty) | ||
assert(desc.properties.isEmpty) | ||
assert(desc.comment.isEmpty) | ||
} | ||
|
||
test("create table - with database name") { | ||
val query = "CREATE TABLE dbx.my_table (id int, name string)" | ||
withCreateTableStatement(query) { state => | ||
assert(state.tableName(0) == "dbx") | ||
assert(state.tableName(1) == "my_table") | ||
} | ||
val (desc, _) = extractTableDesc(query) | ||
assert(desc.identifier.database == Some("dbx")) | ||
assert(desc.identifier.table == "my_table") | ||
} | ||
|
||
test("create table - temporary") { | ||
val query = "CREATE TEMPORARY TABLE tab1 (id int, name string)" | ||
val e = intercept[ParseException] { parser.parsePlan(query) } | ||
assert(e.message.contains("CREATE TEMPORARY TABLE without a provider is not allowed.")) | ||
assert(e.message.contains("CREATE TEMPORARY TABLE is not supported yet")) | ||
} | ||
|
||
test("create table - external") { | ||
val query = "CREATE EXTERNAL TABLE tab1 (id int, name string) LOCATION '/path/to/nowhere'" | ||
val e = intercept[ParseException] { parser.parsePlan(query) } | ||
assert(e.message.contains("Operation not allowed: CREATE EXTERNAL TABLE ...")) | ||
val (desc, _) = extractTableDesc(query) | ||
assert(desc.tableType == CatalogTableType.EXTERNAL) | ||
assert(desc.storage.locationUri == Some(new URI("/path/to/nowhere"))) | ||
} | ||
|
||
test("create table - if not exists") { | ||
val query = "CREATE TABLE IF NOT EXISTS tab1 (id int, name string)" | ||
withCreateTableStatement(query) { state => | ||
assert(state.ifNotExists) | ||
} | ||
val (_, allowExisting) = extractTableDesc(query) | ||
assert(allowExisting) | ||
} | ||
|
||
test("create table - comment") { | ||
val query = "CREATE TABLE my_table (id int, name string) COMMENT 'its hot as hell below'" | ||
withCreateTableStatement(query) { state => | ||
assert(state.comment == Some("its hot as hell below")) | ||
} | ||
val (desc, _) = extractTableDesc(query) | ||
assert(desc.comment == Some("its hot as hell below")) | ||
} | ||
|
||
test("create table - partitioned columns") { | ||
val query = "CREATE TABLE my_table (id int, name string) PARTITIONED BY (id)" | ||
withCreateTableStatement(query) { state => | ||
val transform = IdentityTransform(FieldReference(Seq("id"))) | ||
assert(state.partitioning == Seq(transform)) | ||
} | ||
val query = "CREATE TABLE my_table (id int, name string) PARTITIONED BY (month int)" | ||
val (desc, _) = extractTableDesc(query) | ||
assert(desc.schema == new StructType() | ||
.add("id", "int") | ||
.add("name", "string") | ||
.add("month", "int")) | ||
assert(desc.partitionColumnNames == Seq("month")) | ||
} | ||
|
||
test("create table - clustered by") { | ||
|
@@ -726,22 +730,20 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { | |
""" | ||
|
||
val query1 = s"$baseQuery INTO $numBuckets BUCKETS" | ||
withCreateTableStatement(query1) { state => | ||
assert(state.bucketSpec.isDefined) | ||
val bucketSpec = state.bucketSpec.get | ||
assert(bucketSpec.numBuckets == numBuckets) | ||
assert(bucketSpec.bucketColumnNames.head.equals(bucketedColumn)) | ||
assert(bucketSpec.sortColumnNames.isEmpty) | ||
} | ||
val (desc1, _) = extractTableDesc(query1) | ||
assert(desc1.bucketSpec.isDefined) | ||
val bucketSpec1 = desc1.bucketSpec.get | ||
assert(bucketSpec1.numBuckets == numBuckets) | ||
assert(bucketSpec1.bucketColumnNames.head.equals(bucketedColumn)) | ||
assert(bucketSpec1.sortColumnNames.isEmpty) | ||
|
||
val query2 = s"$baseQuery SORTED BY($sortColumn) INTO $numBuckets BUCKETS" | ||
withCreateTableStatement(query2) { state => | ||
assert(state.bucketSpec.isDefined) | ||
val bucketSpec = state.bucketSpec.get | ||
assert(bucketSpec.numBuckets == numBuckets) | ||
assert(bucketSpec.bucketColumnNames.head.equals(bucketedColumn)) | ||
assert(bucketSpec.sortColumnNames.head.equals(sortColumn)) | ||
} | ||
val (desc2, _) = extractTableDesc(query2) | ||
assert(desc2.bucketSpec.isDefined) | ||
val bucketSpec2 = desc2.bucketSpec.get | ||
assert(bucketSpec2.numBuckets == numBuckets) | ||
assert(bucketSpec2.bucketColumnNames.head.equals(bucketedColumn)) | ||
assert(bucketSpec2.sortColumnNames.head.equals(sortColumn)) | ||
} | ||
|
||
test("create table(hive) - skewed by") { | ||
|
@@ -811,9 +813,8 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { | |
|
||
test("create table - properties") { | ||
val query = "CREATE TABLE my_table (id int, name string) TBLPROPERTIES ('k1'='v1', 'k2'='v2')" | ||
withCreateTableStatement(query) { state => | ||
assert(state.properties == Map("k1" -> "v1", "k2" -> "v2")) | ||
} | ||
val (desc, _) = extractTableDesc(query) | ||
assert(desc.properties == Map("k1" -> "v1", "k2" -> "v2")) | ||
} | ||
|
||
test("create table(hive) - everything!") { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,7 +42,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { | |
private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning | ||
private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled | ||
private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone | ||
private val originalCreateHiveTable = TestHive.conf.createHiveTableByDefaultEnabled | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Effectively no-op. |
||
|
||
def testCases: Seq[(String, File)] = { | ||
hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f) | ||
|
@@ -66,7 +65,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { | |
// Fix session local timezone to America/Los_Angeles for those timezone sensitive tests | ||
// (timestamp_*) | ||
TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles") | ||
TestHive.setConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED, true) | ||
RuleExecutor.resetMetrics() | ||
} | ||
|
||
|
@@ -79,8 +77,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { | |
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) | ||
TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled) | ||
TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, originalSessionLocalTimeZone) | ||
TestHive.setConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED, | ||
originalCreateHiveTable) | ||
|
||
// For debugging dump some statistics about how much time was spent in various optimizer rules | ||
logWarning(RuleExecutor.dumpTimeSpent()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It didn't exist before SPARK-30098, and this test depends on SPARK-30098.