Skip to content

[SPARK-22356][SQL] data source table should support overlapped columns between data and partition schema #19579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType

/**
* A command used to create a data source table.
Expand Down Expand Up @@ -85,14 +86,32 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
}
}

val newTable = table.copy(
schema = dataSource.schema,
partitionColumnNames = partitionColumnNames,
// If metastore partition management for file source tables is enabled, we start off with
// partition provider hive, but no partitions in the metastore. The user has to call
// `msck repair table` to populate the table partitions.
tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
sessionState.conf.manageFilesourcePartitions)
val newTable = dataSource match {
// Since Spark 2.1, we store the inferred schema of data source in metastore, to avoid
// inferring the schema again at read path. However if the data source has overlapped columns
// between data and partition schema, we can't store it in metastore as it breaks the
// assumption of table schema. Here we fallback to the behavior of Spark prior to 2.1, store
// empty schema in metastore and infer it at runtime. Note that this also means the new
// scalable partitioning handling feature(introduced at Spark 2.1) is disabled in this case.
case r: HadoopFsRelation if r.overlappedPartCols.nonEmpty =>
logWarning("It is not recommended to create a table with overlapped data and partition " +
"columns, as Spark cannot store a valid table schema and has to infer it at runtime, " +
"which hurts performance. Please check your data files and remove the partition " +
"columns in it.")
table.copy(schema = new StructType(), partitionColumnNames = Nil)

case _ =>
table.copy(
schema = dataSource.schema,
partitionColumnNames = partitionColumnNames,
// If metastore partition management for file source tables is enabled, we start off with
// partition provider hive, but no partitions in the metastore. The user has to call
// `msck repair table` to populate the table partitions.
tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
sessionState.conf.manageFilesourcePartitions)

}

// We will return Nil or throw exception at the beginning if the table already exists, so when
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources

import java.util.Locale

import scala.collection.mutable

import org.apache.spark.sql.{SparkSession, SQLContext}
Expand Down Expand Up @@ -50,15 +52,22 @@ case class HadoopFsRelation(

override def sqlContext: SQLContext = sparkSession.sqlContext

val schema: StructType = {
val getColName: (StructField => String) =
if (sparkSession.sessionState.conf.caseSensitiveAnalysis) _.name else _.name.toLowerCase
val overlappedPartCols = mutable.Map.empty[String, StructField]
partitionSchema.foreach { partitionField =>
if (dataSchema.exists(getColName(_) == getColName(partitionField))) {
overlappedPartCols += getColName(partitionField) -> partitionField
}
private def getColName(f: StructField): String = {
if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
f.name
} else {
f.name.toLowerCase(Locale.ROOT)
}
}

val overlappedPartCols = mutable.Map.empty[String, StructField]
partitionSchema.foreach { partitionField =>
if (dataSchema.exists(getColName(_) == getColName(partitionField))) {
overlappedPartCols += getColName(partitionField) -> partitionField
}
}

val schema: StructType = {
StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++
partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f))))
}
Expand Down
16 changes: 16 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2741,4 +2741,20 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
assert (aggregateExpressions.isDefined)
assert (aggregateExpressions.get.size == 2)
}

test("SPARK-22356: overlapped columns between data and partition schema in data source tables") {
withTempPath { path =>
Seq((1, 1, 1), (1, 2, 1)).toDF("i", "p", "j")
.write.mode("overwrite").parquet(new File(path, "p=1").getCanonicalPath)
withTable("t") {
sql(s"create table t using parquet options(path='${path.getCanonicalPath}')")
// We should respect the column order in data schema.
assert(spark.table("t").columns === Array("i", "p", "j"))
checkAnswer(spark.table("t"), Row(1, 1, 1) :: Row(1, 1, 1) :: Nil)
// The DESC TABLE should report same schema as table scan.
assert(sql("desc t").select("col_name")
.as[String].collect().mkString(",").contains("i,p,j"))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
private val tmpDataDir = Utils.createTempDir(namePrefix = "test-data")
// For local test, you can set `sparkTestingDir` to a static value like `/tmp/test-spark`, to
// avoid downloading Spark of different versions in each run.
private val sparkTestingDir = Utils.createTempDir(namePrefix = "test-spark")
private val sparkTestingDir = new File("/tmp/test-spark")
private val unusedJar = TestUtils.createJarWithClasses(Seq.empty)

override def afterAll(): Unit = {
Expand Down Expand Up @@ -77,35 +77,38 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
super.beforeAll()

val tempPyFile = File.createTempFile("test", ".py")
// scalastyle:off line.size.limit
Files.write(tempPyFile.toPath,
s"""
|from pyspark.sql import SparkSession
|import os
|
|spark = SparkSession.builder.enableHiveSupport().getOrCreate()
|version_index = spark.conf.get("spark.sql.test.version.index", None)
|
|spark.sql("create table data_source_tbl_{} using json as select 1 i".format(version_index))
|
|spark.sql("create table hive_compatible_data_source_tbl_" + version_index + \\
| " using parquet as select 1 i")
|spark.sql("create table hive_compatible_data_source_tbl_{} using parquet as select 1 i".format(version_index))
|
|json_file = "${genDataDir("json_")}" + str(version_index)
|spark.range(1, 2).selectExpr("cast(id as int) as i").write.json(json_file)
|spark.sql("create table external_data_source_tbl_" + version_index + \\
| "(i int) using json options (path '{}')".format(json_file))
|spark.sql("create table external_data_source_tbl_{}(i int) using json options (path '{}')".format(version_index, json_file))
|
|parquet_file = "${genDataDir("parquet_")}" + str(version_index)
|spark.range(1, 2).selectExpr("cast(id as int) as i").write.parquet(parquet_file)
|spark.sql("create table hive_compatible_external_data_source_tbl_" + version_index + \\
| "(i int) using parquet options (path '{}')".format(parquet_file))
|spark.sql("create table hive_compatible_external_data_source_tbl_{}(i int) using parquet options (path '{}')".format(version_index, parquet_file))
|
|json_file2 = "${genDataDir("json2_")}" + str(version_index)
|spark.range(1, 2).selectExpr("cast(id as int) as i").write.json(json_file2)
|spark.sql("create table external_table_without_schema_" + version_index + \\
| " using json options (path '{}')".format(json_file2))
|spark.sql("create table external_table_without_schema_{} using json options (path '{}')".format(version_index, json_file2))
|
|parquet_file2 = "${genDataDir("parquet2_")}" + str(version_index)
|spark.range(1, 3).selectExpr("1 as i", "cast(id as int) as p", "1 as j").write.parquet(os.path.join(parquet_file2, "p=1"))
|spark.sql("create table tbl_with_col_overlap_{} using parquet options(path '{}')".format(version_index, parquet_file2))
|
|spark.sql("create view v_{} as select 1 i".format(version_index))
""".stripMargin.getBytes("utf8"))
// scalastyle:on line.size.limit

PROCESS_TABLES.testingVersions.zipWithIndex.foreach { case (version, index) =>
val sparkHome = new File(sparkTestingDir, s"spark-$version")
Expand Down Expand Up @@ -153,6 +156,7 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils {
.enableHiveSupport()
.getOrCreate()
spark = session
import session.implicits._

testingVersions.indices.foreach { index =>
Seq(
Expand Down Expand Up @@ -194,6 +198,22 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils {

// test permanent view
checkAnswer(sql(s"select i from v_$index"), Row(1))

// SPARK-22356: overlapped columns between data and partition schema in data source tables
val tbl_with_col_overlap = s"tbl_with_col_overlap_$index"
// For Spark 2.2.0 and 2.1.x, the behavior is different from Spark 2.0.
if (testingVersions(index).startsWith("2.1") || testingVersions(index) == "2.2.0") {
spark.sql("msck repair table " + tbl_with_col_overlap)
assert(spark.table(tbl_with_col_overlap).columns === Array("i", "j", "p"))
checkAnswer(spark.table(tbl_with_col_overlap), Row(1, 1, 1) :: Row(1, 1, 1) :: Nil)
assert(sql("desc " + tbl_with_col_overlap).select("col_name")
.as[String].collect().mkString(",").contains("i,j,p"))
} else {
assert(spark.table(tbl_with_col_overlap).columns === Array("i", "p", "j"))
checkAnswer(spark.table(tbl_with_col_overlap), Row(1, 1, 1) :: Row(1, 1, 1) :: Nil)
assert(sql("desc " + tbl_with_col_overlap).select("col_name")
.as[String].collect().mkString(",").contains("i,p,j"))
}
}
}
}