Skip to content

[SPARK-6538][SQL] Add missing nullable Metastore fields when merging a Parquet schema #5188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

budde
Copy link

@budde budde commented Mar 25, 2015

When Spark SQL infers a schema for a DataFrame, it will take the union of all field types present in the structured source data (e.g. an RDD of JSON data). When the source data for a row doesn't define a particular field on the DataFrame's schema, a null value will simply be assumed for this field. This workflow makes it very easy to construct tables and query over a set of structured data with a nonuniform schema. However, this behavior is not consistent in some cases when dealing with Parquet files and an external table managed by an external Hive metastore.

In our particular usecase, we use Spark Streaming to parse and transform our input data and then apply a window function to save an arbitrary-sized batch of data as a Parquet file, which itself will be added as a partition to an external Hive table via an "ALTER TABLE... ADD PARTITION..." statement. Since our input data is nonuniform, it is expected that not every partition batch will contain every field present in the table's schema obtained from the Hive metastore. As such, we expect that the schema of some of our Parquet files may not contain the same set fields present in the full metastore schema.

In such cases, it seems natural that Spark SQL would simply assume null values for any missing fields in the partition's Parquet file, assuming these fields are specified as nullable by the metastore schema. This is not the case in the current implementation of ParquetRelation2. The mergeMetastoreParquetSchema() method used to reconcile differences between a Parquet file's schema and a schema retrieved from the Hive metastore will raise an exception if the Parquet file doesn't match the same set of fields specified by the metastore.

This pull requests alters the behavior of mergeMetastoreParquetSchema() by having it first add any nullable fields from the metastore schema to the Parquet file schema if they aren't already present there.

Besides the usual code quality and correctness feedback, I'd appreciate any comments specifically on:

  • should this be the assumed behavoir of the mergeMetastoreParquetSchema() method or should I refactor this pull request to make this behavior dependent on a configuration option?
  • am I correct in submitting a pull request to change newParquet.scala in branch-1.3 or should I have submitted it to the master branch?

Thanks for taking a look!

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@marmbrus
Copy link
Contributor

The behavior you describe sounds like a reasonable default to me, but I'll let @liancheng comment on the specifics.

Regarding making a pull request, generally these should be opened against master unless this is a port to a specific branch that does not apply cleanly to master. We will backport as needed when merging. We will also have to reconcile this with #5141.

@liancheng
Copy link
Contributor

Hey @budde, sorry that I just merged #5141 to fix a regression, and introduced conflicts to this PR. As @marmbrus suggested, would you mind to open a PR against master?

Verified that by disabling the new Parquet data source and metastore Parquet table conversion, both 1.2.0 and 1.3.0 works in this scenario.

After double thinking about the semantics of the metastore / parquet schema reconcilation process, I think the rules should be:

  1. There shouldn't be any fields share the same name in both schemas but have different data type
  2. The reconciled schema should contain exactly those fields in the metastore schema
  3. When reading a Parquet file with extra fields not existed in the metastore schema, ignore them
  4. When reading a Parquet file with less fields than the metastore schema, add them as nullable fields (just as what this PR does)

@budde
Copy link
Author

budde commented Mar 26, 2015

Thanks for the input, @marmbrus and @liancheng. I'll resolve the conflicts and open a new PR against master.

@marmbrus
Copy link
Contributor

@liancheng those rules sound pretty reasonable to me. Maybe we should add them to the programming guide?

@liancheng
Copy link
Contributor

@marmbrus Sure. Opening a PR for this.

@liancheng
Copy link
Contributor

Just for future referece, #5214 supercedes this PR.

asfgit pushed a commit that referenced this pull request Mar 28, 2015
…a Parquet schema

Opening to replace #5188.

When Spark SQL infers a schema for a DataFrame, it will take the union of all field types present in the structured source data (e.g. an RDD of JSON data). When the source data for a row doesn't define a particular field on the DataFrame's schema, a null value will simply be assumed for this field. This workflow makes it very easy to construct tables and query over a set of structured data with a nonuniform schema. However, this behavior is not consistent in some cases when dealing with Parquet files and an external table managed by an external Hive metastore.

In our particular usecase, we use Spark Streaming to parse and transform our input data and then apply a window function to save an arbitrary-sized batch of data as a Parquet file, which itself will be added as a partition to an external Hive table via an *"ALTER TABLE... ADD PARTITION..."* statement. Since our input data is nonuniform, it is expected that not every partition batch will contain every field present in the table's schema obtained from the Hive metastore. As such, we expect that the schema of some of our Parquet files may not contain the same set fields present in the full metastore schema.

In such cases, it seems natural that Spark SQL would simply assume null values for any missing fields in the partition's Parquet file, assuming these fields are specified as nullable by the metastore schema. This is not the case in the current implementation of ParquetRelation2. The **mergeMetastoreParquetSchema()** method used to reconcile differences between a Parquet file's schema and a schema retrieved from the Hive metastore will raise an exception if the Parquet file doesn't match the same set of fields specified by the metastore.

This pull requests alters the behavior of **mergeMetastoreParquetSchema()** by having it first add any nullable fields from the metastore schema to the Parquet file schema if they aren't already present there.

Author: Adam Budde <budde@amazon.com>

Closes #5214 from budde/nullable-fields and squashes the following commits:

a52d378 [Adam Budde] Refactor ParquetSchemaSuite.scala for cases now permitted by SPARK-6471 and SPARK-6538
9041bfa [Adam Budde] Add missing nullable Metastore fields when merging a Parquet schema

(cherry picked from commit 5909f09)
Signed-off-by: Cheng Lian <lian@databricks.com>
asfgit pushed a commit that referenced this pull request Mar 28, 2015
…a Parquet schema

Opening to replace #5188.

When Spark SQL infers a schema for a DataFrame, it will take the union of all field types present in the structured source data (e.g. an RDD of JSON data). When the source data for a row doesn't define a particular field on the DataFrame's schema, a null value will simply be assumed for this field. This workflow makes it very easy to construct tables and query over a set of structured data with a nonuniform schema. However, this behavior is not consistent in some cases when dealing with Parquet files and an external table managed by an external Hive metastore.

In our particular usecase, we use Spark Streaming to parse and transform our input data and then apply a window function to save an arbitrary-sized batch of data as a Parquet file, which itself will be added as a partition to an external Hive table via an *"ALTER TABLE... ADD PARTITION..."* statement. Since our input data is nonuniform, it is expected that not every partition batch will contain every field present in the table's schema obtained from the Hive metastore. As such, we expect that the schema of some of our Parquet files may not contain the same set fields present in the full metastore schema.

In such cases, it seems natural that Spark SQL would simply assume null values for any missing fields in the partition's Parquet file, assuming these fields are specified as nullable by the metastore schema. This is not the case in the current implementation of ParquetRelation2. The **mergeMetastoreParquetSchema()** method used to reconcile differences between a Parquet file's schema and a schema retrieved from the Hive metastore will raise an exception if the Parquet file doesn't match the same set of fields specified by the metastore.

This pull requests alters the behavior of **mergeMetastoreParquetSchema()** by having it first add any nullable fields from the metastore schema to the Parquet file schema if they aren't already present there.

Author: Adam Budde <budde@amazon.com>

Closes #5214 from budde/nullable-fields and squashes the following commits:

a52d378 [Adam Budde] Refactor ParquetSchemaSuite.scala for cases now permitted by SPARK-6471 and SPARK-6538
9041bfa [Adam Budde] Add missing nullable Metastore fields when merging a Parquet schema
asfgit pushed a commit that referenced this pull request Jun 23, 2015
This PR adds a section about Hive metastore Parquet table conversion. It documents:

1. Schema reconciliation rules introduced in #5214 (see [this comment] [1] in #5188)
2. Metadata refreshing requirement introduced in #5339

[1]: #5188 (comment)

Author: Cheng Lian <lian@databricks.com>

Closes #5348 from liancheng/sql-doc-parquet-conversion and squashes the following commits:

42ae0d0 [Cheng Lian] Adds Python `refreshTable` snippet
4c9847d [Cheng Lian] Resorts to SQL for Python metadata refreshing snippet
756e660 [Cheng Lian] Adds Python snippet for metadata refreshing
50675db [Cheng Lian] Addes Hive metastore Parquet table conversion section
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants