Skip to content

[SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-3575] [SQL] Parquet data source improvements #4308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from

Conversation

liancheng
Copy link
Contributor

@liancheng liancheng commented Feb 2, 2015

This PR adds three major improvements to Parquet data source:

  1. Partition discovery

    While reading Parquet files resides in Hive style partition directories, ParquetRelation2 automatically discovers partitioning information and infers partition column types.

    This is also a partial work for SPARK-5182, which aims to provide first class partitioning support for the data source API. Related code in this PR can be easily extracted to the data source API level in future versions.

  2. Schema merging

    When enabled, Parquet data source collects schema information from all Parquet part-files and tries to merge them. Exceptions are thrown when incompatible schemas are detected. This feature is controlled by data source option parquet.mergeSchema, and is enabled by default.

  3. Metastore Parquet table conversion moved to analysis phase

    This greatly simplifies the conversion logic. ParquetConversion strategy can be removed once the old Parquet implementation is removed in the future.

This version of Parquet data source aims to entirely replace the old Parquet implementation. However, the old version hasn't been removed yet. Users can fall back to the old version by turning off SQL configuration spark.sql.parquet.useDataSourceApi.

Other JIRA tickets fixed as side effects in this PR:

  • SPARK-5509: EqualTo now uses a proper Ordering to compare binary types.
  • SPARK-3575: Metastore schema is now preserved and passed to ParquetRelation2 via data source option parquet.metastoreSchema.

TODO:

  • More test cases for partition discovery

  • Fix write path after data source write support ([SPARK-5501][SPARK-5420][SQL] Write support for the data source API #4294) is merged

    It turned out to be non-trivial to fall back to old Parquet implementation on the write path when Parquet data source is enabled.  Since we're planning to include data source write support in 1.3.0, I simply ignored two test cases involving Parquet insertion for now.
    
  • Fix outdated comments and documentations

PS: This PR looks big, but more than a half of the changed lines in this PR are trivial changes to test cases. To test Parquet with and without the new data source, almost all Parquet test cases are moved into wrapper driver functions. This introduces hundreds of lines of changes.

Review on Reviewable

if (r == null) null
else if (left.dataType != BinaryType) l == r
else BinaryType.ordering.compare(
l.asInstanceOf[Array[Byte]], r.asInstanceOf[Array[Byte]]) == 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes SPARK-5509. Hit this bug while testing Parquet filters for new data source implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw this is really expensive. i'd use sth like this: http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/primitives/UnsignedBytes.html

If you don't want to change it as part of this PR, file a jira ticket to track it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed SPARK-5553 to track this. I'd like to make sure equality comparison for binary types works properly in this PR. Also, we're already using Ordering to compare binary values in LessThan and GreaterThan etc., so at least this isn't a performance regression.

@SparkQA
Copy link

SparkQA commented Feb 2, 2015

Test build #26514 has finished for PR 4308 at commit af3683e.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DefaultSource extends RelationProvider with SchemaRelationProvider

@liancheng liancheng changed the title [SPARK-5182] [SPARK-5528] [SQL] WIP: Parquet data source improvements [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-3575] [SQL] WIP: Parquet data source improvements Feb 2, 2015
@SparkQA
Copy link

SparkQA commented Feb 2, 2015

Test build #26537 has finished for PR 4308 at commit 0277e47.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DefaultSource extends RelationProvider with SchemaRelationProvider

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26562 has finished for PR 4308 at commit 87689d5.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DefaultSource extends RelationProvider with SchemaRelationProvider

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26572 has finished for PR 4308 at commit 170a0f8.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DefaultSource extends RelationProvider with SchemaRelationProvider

@liancheng liancheng force-pushed the parquet-partition-discovery branch 3 times, most recently from 1b11851 to 07599a7 Compare February 3, 2015 03:53
@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26595 has finished for PR 4308 at commit 1b11851.

  • This patch fails to build.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • class KafkaUtils(object):
    • trait Column extends DataFrame with ExpressionApi
    • class ColumnName(name: String) extends IncomputableColumn(name)
    • trait DataFrame extends DataFrameSpecificApi with RDDApi[Row]
    • class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expression])
    • protected[sql] class QueryExecution(val logical: LogicalPlan)
    • class DefaultSource extends RelationProvider with SchemaRelationProvider

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26596 has finished for PR 4308 at commit 07599a7.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KafkaUtils(object):
    • public class JDBCUtils
    • trait Column extends DataFrame with ExpressionApi
    • class ColumnName(name: String) extends IncomputableColumn(name)
    • trait DataFrame extends DataFrameSpecificApi with RDDApi[Row]
    • class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expression])
    • protected[sql] class QueryExecution(val logical: LogicalPlan)
    • logWarning(s"Couldn't find class $driver", e);
    • implicit class JDBCDataFrame(rdd: DataFrame)
    • class DefaultSource extends RelationProvider with SchemaRelationProvider

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26591 has finished for PR 4308 at commit a760555.

  • This patch fails PySpark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • class DefaultSource extends RelationProvider with SchemaRelationProvider

def mergeCatalystSchemas(left: StructType, right: StructType): StructType =
mergeCatalystDataTypes(left, right).asInstanceOf[StructType]

def mergeCatalystDataTypes(left: DataType, right: DataType): DataType =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be great to add more comment explaining what's going on

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also should this live in catalyst? Seems generally useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, will move it to Catalyst in follow-up PRs.

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26601 has finished for PR 4308 at commit bcb3ad6.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KafkaUtils(object):
    • public class JDBCUtils
    • trait Column extends DataFrame with ExpressionApi
    • class ColumnName(name: String) extends IncomputableColumn(name)
    • trait DataFrame extends DataFrameSpecificApi with RDDApi[Row]
    • class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expression])
    • protected[sql] class QueryExecution(val logical: LogicalPlan)
    • logWarning(s"Couldn't find class $driver", e);
    • implicit class JDBCDataFrame(rdd: DataFrame)
    • class DefaultSource extends RelationProvider with SchemaRelationProvider

def parquetFile(path: String): DataFrame =
DataFrame(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
@scala.annotation.varargs
def parquetFile(paths: String*): DataFrame =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as commented on the other pr, use

def parquetFile(path: String, paths: String*): DataFrame

to make sure this is not ambiguous if we overload the function with another varargs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we actually ever going to do that for this function? This makes it harder to do something like parquetFile(listOfFiles: _*) which I think is actually a common usecase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll add that if we ever do overload this we can do this disambiguation then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay and i convinced @rxin too :)

@liancheng liancheng force-pushed the parquet-partition-discovery branch from bcb3ad6 to 5584e24 Compare February 3, 2015 19:44
@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26667 has finished for PR 4308 at commit 5584e24.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DefaultSource extends RelationProvider with SchemaRelationProvider

@liancheng liancheng force-pushed the parquet-partition-discovery branch from 5584e24 to ae1ee78 Compare February 4, 2015 08:04
@SparkQA
Copy link

SparkQA commented Feb 4, 2015

Test build #26734 has finished for PR 4308 at commit ae1ee78.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

retest this please

@liancheng
Copy link
Contributor Author

The last build failure was caused by a flaky ML test case, which is now fixed in master.

@SparkQA
Copy link

SparkQA commented Feb 4, 2015

Test build #26769 has finished for PR 4308 at commit ae1ee78.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
    • trait CreatableRelationProvider

@liancheng
Copy link
Contributor Author

retest this please.

The last build failure reports that isFile and isRoot are not member of org.apache.hadoop.fs.FileStatus, which doesn't make sense (the pull request builder uses Hadoop 2.3.0, and these methods are definitely defined in FileStatus).

@liancheng liancheng changed the title [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-3575] [SQL] WIP: Parquet data source improvements [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-3575] [SQL] Parquet data source improvements Feb 4, 2015

object ParquetRelation2 {
// Whether we should merge schemas collected from all Parquet part-files.
val MERGE_SCHEMA = "parquet.mergeSchema"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why prefix these with parquet? that seems redudant since you can only use them after specifying USING org.apache.spark.sql.parquet

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also have an option to turn off caching?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Will address these in follow-up PR(s).

@liancheng liancheng force-pushed the parquet-partition-discovery branch from 209f324 to 1ad361e Compare February 5, 2015 20:07
@liancheng
Copy link
Contributor Author

Rebased (for the 8th time during the last 72 hours), should be ready to go once Jenkins nods. Will address comments in follow-up PRs.

@liancheng liancheng force-pushed the parquet-partition-discovery branch from 1ad361e to b6946e6 Compare February 5, 2015 21:31
@liancheng
Copy link
Contributor Author

OK, rebased for the 9th time... Addressed all comments except for adding option to disable metadata caching, which I'd like to include in another PR.

@SparkQA
Copy link

SparkQA commented Feb 5, 2015

Test build #26856 has finished for PR 4308 at commit 1ad361e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
    • trait CreatableRelationProvider

@SparkQA
Copy link

SparkQA commented Feb 5, 2015

Test build #26858 has finished for PR 4308 at commit b6946e6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
    • trait CreatableRelationProvider

asfgit pushed a commit that referenced this pull request Feb 5, 2015
…a source improvements

This PR adds three major improvements to Parquet data source:

1.  Partition discovery

    While reading Parquet files resides in Hive style partition directories, `ParquetRelation2` automatically discovers partitioning information and infers partition column types.

    This is also a partial work for [SPARK-5182] [1], which aims to provide first class partitioning support for the data source API.  Related code in this PR can be easily extracted to the data source API level in future versions.

1.  Schema merging

    When enabled, Parquet data source collects schema information from all Parquet part-files and tries to merge them.  Exceptions are thrown when incompatible schemas are detected.  This feature is controlled by data source option `parquet.mergeSchema`, and is enabled by default.

1.  Metastore Parquet table conversion moved to analysis phase

    This greatly simplifies the conversion logic.  `ParquetConversion` strategy can be removed once the old Parquet implementation is removed in the future.

This version of Parquet data source aims to entirely replace the old Parquet implementation.  However, the old version hasn't been removed yet.  Users can fall back to the old version by turning off SQL configuration `spark.sql.parquet.useDataSourceApi`.

Other JIRA tickets fixed as side effects in this PR:

- [SPARK-5509] [3]: `EqualTo` now uses a proper `Ordering` to compare binary types.

- [SPARK-3575] [4]: Metastore schema is now preserved and passed to `ParquetRelation2` via data source option `parquet.metastoreSchema`.

TODO:

- [ ] More test cases for partition discovery
- [x] Fix write path after data source write support (#4294) is merged

      It turned out to be non-trivial to fall back to old Parquet implementation on the write path when Parquet data source is enabled.  Since we're planning to include data source write support in 1.3.0, I simply ignored two test cases involving Parquet insertion for now.

- [ ] Fix outdated comments and documentations

PS: This PR looks big, but more than a half of the changed lines in this PR are trivial changes to test cases. To test Parquet with and without the new data source, almost all Parquet test cases are moved into wrapper driver functions. This introduces hundreds of lines of changes.

[1]: https://issues.apache.org/jira/browse/SPARK-5182
[2]: https://issues.apache.org/jira/browse/SPARK-5528
[3]: https://issues.apache.org/jira/browse/SPARK-5509
[4]: https://issues.apache.org/jira/browse/SPARK-3575

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4308)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #4308 from liancheng/parquet-partition-discovery and squashes the following commits:

b6946e6 [Cheng Lian] Fixes MiMA issues, addresses comments
8232e17 [Cheng Lian] Write support for Parquet data source
a49bd28 [Cheng Lian] Fixes spelling typo in trait name "CreateableRelationProvider"
808380f [Cheng Lian] Fixes issues introduced while rebasing
50dd8d1 [Cheng Lian] Addresses @rxin's comment, fixes UDT schema merging
adf2aae [Cheng Lian] Fixes compilation error introduced while rebasing
4e0175f [Cheng Lian] Fixes Python Parquet API, we need Py4J array to call varargs method
0d8ec1d [Cheng Lian] Adds more test cases
b35c8c6 [Cheng Lian] Fixes some typos and outdated comments
dd704fd [Cheng Lian] Fixes Python Parquet API
596c312 [Cheng Lian] Uses switch to control whether use Parquet data source or not
7d0f7a2 [Cheng Lian] Fixes Metastore Parquet table conversion
a1896c7 [Cheng Lian] Fixes all existing Parquet test suites except for ParquetMetastoreSuite
5654c9d [Cheng Lian] Draft version of Parquet partition discovery and schema merging

(cherry picked from commit a9ed511)
Signed-off-by: Michael Armbrust <michael@databricks.com>
@asfgit asfgit closed this in a9ed511 Feb 5, 2015
@liancheng liancheng deleted the parquet-partition-discovery branch February 5, 2015 23:47
/**
* Converts a string to a `Literal` with automatic type inference. Currently only supports
* [[IntegerType]], [[LongType]], [[FloatType]], [[DoubleType]], [[DecimalType.Unlimited]], and
* [[StringType]].
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be reasonable to support DateType and then StringType. In my experience, breaking data down by a date partition is pretty common and useful. My thinking is that if you see a string in the format YYYY-MM-DD (for example, I do recommend that format given its alphabetical sorting, personally, but it doesn't have to be that), then you can probably safely assume that the partition is intended to be a date.

I'm not super familiar with this code though, so I'll have to defer to others' expertise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Trying DataType before StringType makes sense. I can add this. Thanks for the suggestion!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants