Skip to content

[SPARK-5501][SPARK-5420][SQL] Write support for the data source API #4294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed

[SPARK-5501][SPARK-5420][SQL] Write support for the data source API #4294

wants to merge 18 commits into from

Conversation

yhuai
Copy link
Contributor

@yhuai yhuai commented Jan 30, 2015

This PR aims to support INSERT INTO/OVERWRITE TABLE tableName and CREATE TABLE tableName AS SELECT for the data source API (partitioned tables are not supported).

In this PR, I am also adding the support of IF NOT EXISTS for our ddl parser. The current semantic of IF NOT EXISTS is explained as follows.

  • For a CREATE TEMPORARY TABLE statement, it does not IF NOT EXISTS for now.
  • For a CREATE TABLE statement (we are creating a metastore table), if there is an existing table having the same name ...
    • when IF NOT EXISTS clause is used, we will do nothing.
    • when IF NOT EXISTS clause is not used, the user will see an exception saying the table already exists.

TODOs:

  • CTAS support
  • Programmatic APIs
  • Python API (another PR)
  • More unit tests
  • Documents (another PR)

@marmbrus @liancheng @rxin

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26426 has started for PR 4294 at commit a2f9c06.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26426 has finished for PR 4294 at commit a2f9c06.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait TableScan extends BaseRelation
    • trait PrunedScan extends BaseRelation
    • trait PrunedFilteredScan extends BaseRelation
    • trait CatalystScan extends BaseRelation
    • trait InsertableRelation extends BaseRelation

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26426/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 31, 2015

Test build #26448 has started for PR 4294 at commit db8dcda.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 31, 2015

Test build #26448 has finished for PR 4294 at commit db8dcda.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • protected[sql] class DDLException(message: String) extends Exception(message)
    • trait TableScan extends BaseRelation
    • trait PrunedScan extends BaseRelation
    • trait PrunedFilteredScan extends BaseRelation
    • trait CatalystScan extends BaseRelation
    • trait InsertableRelation extends BaseRelation
    • case class CreateMetastoreDataSourceAsSelect(

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26448/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 31, 2015

Test build #26450 has started for PR 4294 at commit 43747d3.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 31, 2015

Test build #26450 has finished for PR 4294 at commit 43747d3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • protected[sql] class DDLException(message: String) extends Exception(message)
    • trait TableScan extends BaseRelation
    • trait PrunedScan extends BaseRelation
    • trait PrunedFilteredScan extends BaseRelation
    • trait CatalystScan extends BaseRelation
    • trait InsertableRelation extends BaseRelation
    • case class CreateMetastoreDataSourceAsSelect(

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26450/
Test FAILed.

| INSERT ~> INTO ~> TABLE ~> relation ~ select ^^ {
case r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, false)
}
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be simplified to this:

    INSERT ~> (OVERWRITE ^^^ true | INTO ^^^ false) ~ (TABLE ~> relation) ~ select ^^ {
      case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
    }

@SparkQA
Copy link

SparkQA commented Jan 31, 2015

Test build #26467 has started for PR 4294 at commit d37b19c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 31, 2015

Test build #26467 has finished for PR 4294 at commit d37b19c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • protected[sql] class DDLException(message: String) extends Exception(message)
    • trait TableScan extends BaseRelation
    • trait PrunedScan extends BaseRelation
    • trait PrunedFilteredScan extends BaseRelation
    • trait CatalystScan extends BaseRelation
    • trait InsertableRelation extends BaseRelation
    • case class CreateMetastoreDataSourceAsSelect(

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26467/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 31, 2015

Test build #26468 has started for PR 4294 at commit 95a7c71.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 31, 2015

Test build #26468 has finished for PR 4294 at commit 95a7c71.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • val elem = s"array (class $
    • val elem = s"externalizable object (class $
    • val elem = s"object (class $
    • implicit class ObjectStreamClassMethods(val desc: ObjectStreamClass) extends AnyVal
    • class IsotonicRegressionModel (
    • protected[sql] class DDLException(message: String) extends Exception(message)
    • trait TableScan extends BaseRelation
    • trait PrunedScan extends BaseRelation
    • trait PrunedFilteredScan extends BaseRelation
    • trait CatalystScan extends BaseRelation
    • trait InsertableRelation extends BaseRelation
    • case class CreateMetastoreDataSourceAsSelect(

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26468/
Test PASSed.


@DeveloperApi
trait InsertableRelation extends BaseRelation {
def insertInto(data: DataFrame, overwrite: Boolean): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name looks a bit weird when you put them together: relation.insertInto(dataFrame), looks like we are inserting a relation into a data frame... Maybe just insert?

@SparkQA
Copy link

SparkQA commented Feb 1, 2015

Test build #26469 has started for PR 4294 at commit 1a719a5.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 1, 2015

Test build #26469 has finished for PR 4294 at commit 1a719a5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • val elem = s"array (class $
    • val elem = s"externalizable object (class $
    • val elem = s"object (class $
    • implicit class ObjectStreamClassMethods(val desc: ObjectStreamClass) extends AnyVal
    • class IsotonicRegressionModel (
    • protected[sql] class DDLException(message: String) extends Exception(message)
    • trait TableScan extends BaseRelation
    • trait PrunedScan extends BaseRelation
    • trait PrunedFilteredScan extends BaseRelation
    • trait CatalystScan extends BaseRelation
    • trait InsertableRelation extends BaseRelation
    • case class CreateMetastoreDataSourceAsSelect(

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26561/
Test FAILed.

@@ -77,6 +77,14 @@ trait SchemaRelationProvider {
schema: StructType): BaseRelation
}

trait CreateableRelation {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change it to CreateableRelationProvider

yhuai added 2 commits February 2, 2015 20:51
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
	sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26604 has started for PR 4294 at commit 142372a.

  • This patch merges cleanly.

@yhuai yhuai changed the title [WIP][SPARK-5501][SPARK-5420][SQL] Write support for the data source API [SPARK-5501][SPARK-5420][SQL] Write support for the data source API Feb 3, 2015
@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26605 has started for PR 4294 at commit 1c98881.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26604 has finished for PR 4294 at commit 142372a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • sys.error(s"Failed to load class for data source: $provider")
    • protected[sql] class DDLException(message: String) extends Exception(message)
    • trait CreateableRelationProvider
    • trait TableScan extends BaseRelation
    • trait PrunedScan extends BaseRelation
    • trait PrunedFilteredScan extends BaseRelation
    • trait CatalystScan extends BaseRelation
    • trait InsertableRelation extends BaseRelation
    • case class CreateMetastoreDataSourceAsSelect(

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26604/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26605 has finished for PR 4294 at commit 1c98881.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class JDBCUtils
    • logWarning(s"Couldn't find class $driver", e);
    • implicit class JDBCDataFrame(rdd: DataFrame)
    • sys.error(s"Failed to load class for data source: $provider")
    • protected[sql] class DDLException(message: String) extends Exception(message)
    • trait CreateableRelationProvider
    • trait TableScan extends BaseRelation
    • trait PrunedScan extends BaseRelation
    • trait PrunedFilteredScan extends BaseRelation
    • trait CatalystScan extends BaseRelation
    • trait InsertableRelation extends BaseRelation
    • case class CreateMetastoreDataSourceAsSelect(

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26605/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26608 has started for PR 4294 at commit 3db1539.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26608 has finished for PR 4294 at commit 3db1539.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class KafkaUtils(object):
    • public class JDBCUtils
    • trait Column extends DataFrame with ExpressionApi
    • class ColumnName(name: String) extends IncomputableColumn(name)
    • trait DataFrame extends DataFrameSpecificApi with RDDApi[Row]
    • class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expression])
    • protected[sql] class QueryExecution(val logical: LogicalPlan)
    • logWarning(s"Couldn't find class $driver", e);
    • implicit class JDBCDataFrame(rdd: DataFrame)
    • sys.error(s"Failed to load class for data source: $provider")
    • protected[sql] class DDLException(message: String) extends Exception(message)
    • trait CreateableRelationProvider
    • trait TableScan extends BaseRelation
    • trait PrunedScan extends BaseRelation
    • trait PrunedFilteredScan extends BaseRelation
    • trait CatalystScan extends BaseRelation
    • trait InsertableRelation extends BaseRelation
    • case class CreateMetastoreDataSourceAsSelect(

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26608/
Test PASSed.

@rxin
Copy link
Contributor

rxin commented Feb 3, 2015

Thanks. I'm merging this in master.

@asfgit asfgit closed this in 13531dd Feb 3, 2015
asfgit pushed a commit that referenced this pull request Feb 5, 2015
…a source improvements

This PR adds three major improvements to Parquet data source:

1.  Partition discovery

    While reading Parquet files resides in Hive style partition directories, `ParquetRelation2` automatically discovers partitioning information and infers partition column types.

    This is also a partial work for [SPARK-5182] [1], which aims to provide first class partitioning support for the data source API.  Related code in this PR can be easily extracted to the data source API level in future versions.

1.  Schema merging

    When enabled, Parquet data source collects schema information from all Parquet part-files and tries to merge them.  Exceptions are thrown when incompatible schemas are detected.  This feature is controlled by data source option `parquet.mergeSchema`, and is enabled by default.

1.  Metastore Parquet table conversion moved to analysis phase

    This greatly simplifies the conversion logic.  `ParquetConversion` strategy can be removed once the old Parquet implementation is removed in the future.

This version of Parquet data source aims to entirely replace the old Parquet implementation.  However, the old version hasn't been removed yet.  Users can fall back to the old version by turning off SQL configuration `spark.sql.parquet.useDataSourceApi`.

Other JIRA tickets fixed as side effects in this PR:

- [SPARK-5509] [3]: `EqualTo` now uses a proper `Ordering` to compare binary types.

- [SPARK-3575] [4]: Metastore schema is now preserved and passed to `ParquetRelation2` via data source option `parquet.metastoreSchema`.

TODO:

- [ ] More test cases for partition discovery
- [x] Fix write path after data source write support (#4294) is merged

      It turned out to be non-trivial to fall back to old Parquet implementation on the write path when Parquet data source is enabled.  Since we're planning to include data source write support in 1.3.0, I simply ignored two test cases involving Parquet insertion for now.

- [ ] Fix outdated comments and documentations

PS: This PR looks big, but more than a half of the changed lines in this PR are trivial changes to test cases. To test Parquet with and without the new data source, almost all Parquet test cases are moved into wrapper driver functions. This introduces hundreds of lines of changes.

[1]: https://issues.apache.org/jira/browse/SPARK-5182
[2]: https://issues.apache.org/jira/browse/SPARK-5528
[3]: https://issues.apache.org/jira/browse/SPARK-5509
[4]: https://issues.apache.org/jira/browse/SPARK-3575

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4308)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #4308 from liancheng/parquet-partition-discovery and squashes the following commits:

b6946e6 [Cheng Lian] Fixes MiMA issues, addresses comments
8232e17 [Cheng Lian] Write support for Parquet data source
a49bd28 [Cheng Lian] Fixes spelling typo in trait name "CreateableRelationProvider"
808380f [Cheng Lian] Fixes issues introduced while rebasing
50dd8d1 [Cheng Lian] Addresses @rxin's comment, fixes UDT schema merging
adf2aae [Cheng Lian] Fixes compilation error introduced while rebasing
4e0175f [Cheng Lian] Fixes Python Parquet API, we need Py4J array to call varargs method
0d8ec1d [Cheng Lian] Adds more test cases
b35c8c6 [Cheng Lian] Fixes some typos and outdated comments
dd704fd [Cheng Lian] Fixes Python Parquet API
596c312 [Cheng Lian] Uses switch to control whether use Parquet data source or not
7d0f7a2 [Cheng Lian] Fixes Metastore Parquet table conversion
a1896c7 [Cheng Lian] Fixes all existing Parquet test suites except for ParquetMetastoreSuite
5654c9d [Cheng Lian] Draft version of Parquet partition discovery and schema merging

(cherry picked from commit a9ed511)
Signed-off-by: Michael Armbrust <michael@databricks.com>
asfgit pushed a commit that referenced this pull request Feb 5, 2015
…a source improvements

This PR adds three major improvements to Parquet data source:

1.  Partition discovery

    While reading Parquet files resides in Hive style partition directories, `ParquetRelation2` automatically discovers partitioning information and infers partition column types.

    This is also a partial work for [SPARK-5182] [1], which aims to provide first class partitioning support for the data source API.  Related code in this PR can be easily extracted to the data source API level in future versions.

1.  Schema merging

    When enabled, Parquet data source collects schema information from all Parquet part-files and tries to merge them.  Exceptions are thrown when incompatible schemas are detected.  This feature is controlled by data source option `parquet.mergeSchema`, and is enabled by default.

1.  Metastore Parquet table conversion moved to analysis phase

    This greatly simplifies the conversion logic.  `ParquetConversion` strategy can be removed once the old Parquet implementation is removed in the future.

This version of Parquet data source aims to entirely replace the old Parquet implementation.  However, the old version hasn't been removed yet.  Users can fall back to the old version by turning off SQL configuration `spark.sql.parquet.useDataSourceApi`.

Other JIRA tickets fixed as side effects in this PR:

- [SPARK-5509] [3]: `EqualTo` now uses a proper `Ordering` to compare binary types.

- [SPARK-3575] [4]: Metastore schema is now preserved and passed to `ParquetRelation2` via data source option `parquet.metastoreSchema`.

TODO:

- [ ] More test cases for partition discovery
- [x] Fix write path after data source write support (#4294) is merged

      It turned out to be non-trivial to fall back to old Parquet implementation on the write path when Parquet data source is enabled.  Since we're planning to include data source write support in 1.3.0, I simply ignored two test cases involving Parquet insertion for now.

- [ ] Fix outdated comments and documentations

PS: This PR looks big, but more than a half of the changed lines in this PR are trivial changes to test cases. To test Parquet with and without the new data source, almost all Parquet test cases are moved into wrapper driver functions. This introduces hundreds of lines of changes.

[1]: https://issues.apache.org/jira/browse/SPARK-5182
[2]: https://issues.apache.org/jira/browse/SPARK-5528
[3]: https://issues.apache.org/jira/browse/SPARK-5509
[4]: https://issues.apache.org/jira/browse/SPARK-3575

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4308)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #4308 from liancheng/parquet-partition-discovery and squashes the following commits:

b6946e6 [Cheng Lian] Fixes MiMA issues, addresses comments
8232e17 [Cheng Lian] Write support for Parquet data source
a49bd28 [Cheng Lian] Fixes spelling typo in trait name "CreateableRelationProvider"
808380f [Cheng Lian] Fixes issues introduced while rebasing
50dd8d1 [Cheng Lian] Addresses @rxin's comment, fixes UDT schema merging
adf2aae [Cheng Lian] Fixes compilation error introduced while rebasing
4e0175f [Cheng Lian] Fixes Python Parquet API, we need Py4J array to call varargs method
0d8ec1d [Cheng Lian] Adds more test cases
b35c8c6 [Cheng Lian] Fixes some typos and outdated comments
dd704fd [Cheng Lian] Fixes Python Parquet API
596c312 [Cheng Lian] Uses switch to control whether use Parquet data source or not
7d0f7a2 [Cheng Lian] Fixes Metastore Parquet table conversion
a1896c7 [Cheng Lian] Fixes all existing Parquet test suites except for ParquetMetastoreSuite
5654c9d [Cheng Lian] Draft version of Parquet partition discovery and schema merging
@yhuai yhuai deleted the writeSupport branch February 11, 2015 18:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants