Skip to content

[SPARK-22245][SQL] partitioned data set should always put partition columns at the end #19471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Oct 11, 2017

Background

In Spark SQL, partition columns always appear at the end of the schema, even with user-specified schema:

scala> Seq(1->1).toDF("i", "j").write.partitionBy("i").parquet("/tmp/t")

scala> spark.read.parquet("/tmp/t").show
+---+---+
|  j|  i|
+---+---+
|  1|  1|
+---+---+

scala> spark.read.schema("i int, j int").parquet("/tmp/t").show
+---+---+
|  j|  i|
+---+---+
|  1|  1|
+---+---+

scala> spark.read.schema("j int, i int").parquet("/tmp/t").show
+---+---+
|  j|  i|
+---+---+
|  1|  1|
+---+---+

This behavior also aligns with tables:

scala> sql("create table t(i int, j int) using parquet partitioned by (i)")
res5: org.apache.spark.sql.DataFrame = []

scala> spark.table("t").printSchema
root
 |-- j: integer (nullable = true)
 |-- i: integer (nullable = true)

However, for historical reasons, Spark SQL supports partition columns appearing in data files, and respect the order of partition columns in data schema but pick the value from partition directories:

scala> Seq(1->1, 2 -> 1).toDF("i", "j").write.parquet("/tmp/t/i=1")

// You can see the value of column i is always 1, so the value of partition columns are picked
// from partition directories.
scala> spark.read.parquet("/tmp/t").show
17/10/11 16:28:28 WARN DataSource: Found duplicate column(s) in the data schema and the partition schema: `i`;
+---+---+
|  i|  j|
+---+---+
|  1|  1|
|  1|  1|
+---+---+

The behavior of this case is a little weird and have problems when dealing with tables(with hive metastore):

// With user-specified schema, partition columns are always at the end now.
scala> spark.read.schema("i int, j int").parquet("/tmp/t").show
+---+---+
|  j|  i|
+---+---+
|  1|  1|
|  1|  1|
+---+---+

scala> spark.read.schema("j int, i int").parquet("/tmp/t").show
+---+---+
|  j|  i|
+---+---+
|  1|  1|
|  1|  1|
+---+---+

// `skipHiveMetadata=true` simulates a hive-incompatible schema.
scala> sql("create table t using parquet options(skipHiveMetadata=true) location '/tmp/t'")
17/10/11 16:57:00 WARN DataSource: Found duplicate column(s) in the data schema and the partition schema: `i`;
17/10/11 16:57:00 WARN HiveExternalCatalog: Persisting data source table `default`.`t` into Hive metastore inSpark SQL specific format, which is NOT compatible with Hive.
java.lang.AssertionError: assertion failed
  at scala.Predef$.assert(Predef.scala:156)
  at org.apache.spark.sql.catalyst.catalog.CatalogTable.partitionSchema(interface.scala:242)
  at org.apache.spark.sql.hive.HiveExternalCatalog.newSparkSQLSpecificMetastoreTable$1(HiveExternalCatalog.scala:299)
...

The reason of this bug is, when we respect the order of partition columns in data schema, we will get an invalid table schema which breaks the assumption that partition columns should be at the end.

Proposal

My proposal is: First we should always put partition columns at the end, to have a consistent behavior. Second we should ignore the partitions columns in data files when dealing with tables.

One problem is, we don't have corrected data/physical schema in metastore and may fail to read non-self-description file format like CSV. I think this is really a corner case(having overlapped columns in data and partition schema), and the table schema can't have overlapped columns in data and partition schema(unless we hack it into table properties), so we don't have a better choice.

Another problem is, for tables created before Spark 2.2, we may already have invalid table schema in metastore. We should handle this case and adjust table schema before reading the table.

Changed behavior

No behavior change if there is no overlapped columns in data and partition schema.

The schema changed(partition columns go to the end) when reading file format data source with partition columns in data files.

@cloud-fan
Copy link
Contributor Author

@maropu
Copy link
Member

maropu commented Oct 11, 2017

Does this change affect some other tests for the overlapped cases like DataStreamReaderWriterSuite and OrcPartitionDiscoverySuite? Since we already have some amount of these tests in multiple places, (I know you've already considered this aspect though....) I'm a little worried about if this change in minor releases makes users confused.

@SparkQA
Copy link

SparkQA commented Oct 11, 2017

Test build #82633 has finished for PR 19471 at commit ac7ae6b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

waiting for more feedbacks before moving forward :)

Another thing I wanna point out: for sql("create table t using parquet options(skipHiveMetadata=true) location '/tmp/t'"), it works in Spark 2.0, and the created table has a schema that the partition column is at the beginning. In Spark 2.1, it also works, and DESC TABLE also shows the table schema has partition column at the beginning. However, if you query the table, the output schema has partition column at the end.

It's been a long time since Spark 2.1 was released and no one reports this behavior change. It seems this is really a corner case and makes me feel we should not compilcate our code too much for it.

@maropu
Copy link
Member

maropu commented Oct 11, 2017

Fair enough to me. To check this change reasonable, we might be able to send a dev/user list email to social feedbacks. I saw marmbrus doing so when adding the json API;
#15274 (comment)
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-JSON-Column-Support-td19132.html
If we have no response or positive feedbacks, we could quickly/safely drop the support.

@dongjoon-hyun
Copy link
Member

+1 for this change. BTW, wow, there are lots of test case failures: 81 failures.

@SparkQA
Copy link

SparkQA commented Oct 12, 2017

Test build #82671 has finished for PR 19471 at commit dea7037.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Oct 15, 2017

We may need to document this change in Migration Guide in SQL programming guide.

@gatorsmile
Copy link
Member

No behavior change if there is no overlapped columns in data and partition schema.

The schema changed(partition columns go to the end) when reading file format data source with partition columns in data files.

@cloud-fan Could you check why so many test cases failed?

@cloud-fan
Copy link
Contributor Author

closing in favor of #19579

@cloud-fan cloud-fan closed this Oct 26, 2017
@SparkQA
Copy link

SparkQA commented Oct 26, 2017

Test build #83066 has finished for PR 19471 at commit d21ebaa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants