Skip to content

[SPARK-17183][SPARK-17983][SPARK-18101][SQL] put hive serde table schema to table properties like data source table #14750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

For data source tables, we will put its table schema, partition columns, etc. to table properties, to work around some hive metastore issues, e.g. not case-preserving, bad decimal type support, etc.

We should also do this for hive serde tables, to reduce the difference between hive serde tables and data source tables, e.g. column names should be case preserving.

How was this patch tested?

existing tests, and a new test in HiveExternalCatalog

@cloud-fan
Copy link
Contributor Author

cc @yhuai @gatorsmile

@SparkQA
Copy link

SparkQA commented Aug 22, 2016

Test build #64202 has finished for PR 14750 at commit 167fd43.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 22, 2016

Test build #64216 has finished for PR 14750 at commit 8fc6bcc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan force-pushed the minor1 branch 2 times, most recently from bbcefcd to 4f3dfa7 Compare August 23, 2016 01:15
@SparkQA
Copy link

SparkQA commented Aug 23, 2016

Test build #64251 has finished for PR 14750 at commit bbcefcd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 23, 2016

Test build #64252 has finished for PR 14750 at commit 4f3dfa7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 25, 2016

Test build #64409 has finished for PR 14750 at commit 6c9c130.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 25, 2016

Test build #64416 has finished for PR 14750 at commit f28a0c4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 25, 2016

Test build #64421 has finished for PR 14750 at commit 5b41a39.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// 2. Check if this table is hive compatible
// 2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty
// before save table metadata to Hive.
// 2.1 If it's hive compatible, set serde information in table metadata and try to save
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.1 -> 2.2

@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64656 has finished for PR 14750 at commit 89fbbea.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64655 has finished for PR 14750 at commit dcb2927.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64661 has finished for PR 14750 at commit 52db0ed.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Aug 31, 2016

Test build #64691 has finished for PR 14750 at commit 52db0ed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 6, 2016

Test build #64980 has finished for PR 14750 at commit 60fd368.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 6, 2016

Test build #64998 has finished for PR 14750 at commit f248b8d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

We still need to resolve the case sensitivity issues in checkDuplication.

To reproduce the issue, below is the SQL

    sql("CREATE TABLE tbl(A int) PARTITIONED BY (a string)")

Below is the error we got:

org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: at least one column must be specified for the table
    at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:720)

@SparkQA
Copy link

SparkQA commented Sep 7, 2016

Test build #65040 has finished for PR 14750 at commit eb3bba4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Hive metastore may change the table schema, e.g. schema inference. If the table
// schema we read back is different from the one in table properties which was written
// when creating table, we should respect the table schema from hive.
table.copy(properties = getOriginalTableProperties(table))
Copy link
Contributor

@ericl ericl Oct 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we still restore the original mixed case column names when possible?

robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
…d to answer a query

(This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.)

## What changes were proposed in this pull request?

In a new Spark session, when a partitioned Hive table is converted to use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference.

If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild.

In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the `HiveMetastoreCatalog` table cache to represent a significant portion of the driver's heap space.

This PR proposes an alternative approach. Basically, it makes four changes:

1. It adds a new method, `listPartitionsByFilter` to the Catalyst `ExternalCatalog` trait which returns the partition metadata for a given sequence of partition pruning predicates.
1. It refactors the `FileCatalog` type hierarchy to include a new `TableFileCatalog` to efficiently return files only for partitions matching a sequence of partition pruning predicates.
1. It removes partition loading and caching from `HiveMetastoreCatalog`.
1. It adds a new Catalyst optimizer rule, `PruneFileSourcePartitions`, which applies a plan's partition-pruning predicates to prune out unnecessary partition files from a `HadoopFsRelation`'s underlying file catalog.

The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. As part of this operation, the `HiveMetastoreCatalog` builds a `HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition metadata or scan any files. The optimizer prunes-away unnecessary table partitions by sending the partition-pruning predicates to the relation's `TableFileCatalog `. The `TableFileCatalog` in turn calls the `listPartitionsByFilter` method on its external catalog. This queries the Hive metastore, passing along those filters.

As a bonus, performing partition pruning during optimization leads to a more accurate relation size estimate. This, along with c481bdf, can lead to automatic, safe application of the broadcast optimization in a join where it might previously have been omitted.

## Open Issues

1. This PR omits partition metadata caching. I can add this once the overall strategy for the cold path is established, perhaps in a future PR.
1. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. This issue may be fixed by apache#14750, but that PR appears to have stalled. ericl has contributed to this PR a workaround for Parquet wherein schema reconciliation occurs at query execution time instead of planning. Whether ORC requires a similar patch is an open issue.
1. This PR omits an implementation of `listPartitionsByFilter` for the `InMemoryCatalog`.
1. This PR breaks parquet log output redirection during query execution. I can work around this by running `Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")` first thing in a Spark shell session, but I haven't figured out how to fix this properly.

## How was this patch tested?

The current Spark unit tests were run, and some ad-hoc tests were performed to validate that only the necessary partition metadata is loaded.

Author: Michael Allman <michael@videoamp.com>
Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes apache#14690 from mallman/spark-16980-lazy_partition_fetching.
@SparkQA
Copy link

SparkQA commented Nov 3, 2016

Test build #68070 has finished for PR 14750 at commit 7b6fb13.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 3, 2016

Test build #68071 has finished for PR 14750 at commit 886a5af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// If this is an external data source table...
if (tableDefinition.tableType == EXTERNAL &&
if (tableDefinition.provider.get != "hive" && tableDefinition.tableType == EXTERNAL &&
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should avoid calling this method for hive table in the try-catch

private def requireTableExists(db: String, table: String): Unit = {
withClient { getTable(db, table) }
/**
* Get the raw table metadata from hive metastore directly.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also define raw at here.

val message = s"Persisting Hive serde table $qualifiedTableName into Hive metastore."
val tableWithDataSourceProps = tableDefinition.copy(
properties = tableDefinition.properties ++ tableProperties)
(Some(tableWithDataSourceProps), message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments at here.

// If it's a managed table and we are renaming it, then the path option becomes inaccurate
// and we need to update it according to the new table name.
val hasPathOption = new CaseInsensitiveMap(rawTable.storage.properties).contains("path")
val storageWithNewPath = if (rawTable.tableType == MANAGED && hasPathOption) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also explain why we are not changing locationUri at here (need to mention that Hive will change it inside alterTable, line https://github.com/apache/spark/pull/14750/files#diff-159191585e10542f013cb3a714f26075R448).

// field. We should respect the old `locationUri` even it's None.
val oldLocation = getLocationFromStorageProps(oldTableDef)
if (oldLocation == newLocation) {
storageWithPathOption.copy(locationUri = oldTableDef.storage.locationUri)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use an example to explain what we are doing at here.

@@ -537,22 +559,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table
} else {
getProviderFromTableProperties(table).map { provider =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we need to make this part more reader friendly. Maybe ?

getProviderFromTableProperties(table) match {
  case None => // regular hive tables
  case Some("hive") => // hive serde table created by spark 2.1 or higher version
  case Some(other) => // data source table
}

@@ -620,7 +667,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat

val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
getTable(db, table).partitionColumnNames.foreach { colName =>
orderedPartitionSpec.put(colName, partition(colName))
// Lowercase the partition column names before passing the partition spec to Hive client, as
// Hive metastore is not case preserving.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's mention that Hive will lower case the part column names. So, if we do not convert the name to its lower case, hive will complain.

@ericl
Copy link
Contributor

ericl commented Nov 4, 2016

Could you also verify that

// TODO(ekl) enable for hive tables as well once SPARK-17983 is fixed
works for hive tables now?

@SparkQA
Copy link

SparkQA commented Nov 4, 2016

Test build #68117 has finished for PR 14750 at commit afa5d85.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan changed the title [SPARK-17183][SQL] put hive serde table schema to table properties like data source table [SPARK-17183][SPARK-17983][SPARK-18101][SQL] put hive serde table schema to table properties like data source table Nov 4, 2016
@SparkQA
Copy link

SparkQA commented Nov 4, 2016

Test build #68134 has finished for PR 14750 at commit b02cf0d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this out locally and it works, even with all the parquet / orc readers reverted. Just some questions.


if (tableDefinition.tableType == VIEW) {
client.createTable(tableDefinition, ignoreIfExists)
} else if (tableDefinition.provider.get == "hive") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we compare against a constant instead of a string literal here?

// will be updated automatically in Hive metastore by the `alterTable` call at the end of this
// method. Here we only update the path option if the path option already exists in storage
// properties, to avoid adding a unnecessary path option for Hive serde tables.
val hasPathOption = new CaseInsensitiveMap(rawTable.storage.properties).contains("path")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related to this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related, but it's a follow up of https://github.com/apache/spark/pull/15024/files#diff-159191585e10542f013cb3a714f26075R422 , which may add an extra path option to Hive serde tables.

val propsWithoutPath = table.storage.properties.filterKeys(_.toLowerCase != "path")
val propsWithoutPath = table.storage.properties.filter {
case (k, v) => k.toLowerCase != "path"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this equivalent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the map returned by filterKeys is not serializable, I'll add some comment

// old `locationUri` even it's None.
val storageWithNewLocation = if (oldLocation == newLocation) {
oldTableDef.storage
val newStorage = if (tableDefinition.provider.get == "hive") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also compare against constant?

// schema we read back is different(ignore case and nullability) from the one in table
// properties which was written when creating table, we should respect the table schema
// from hive.
hiveTable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there cases where this could be issue? Should we log a warning here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea!

if (!isDataSourceTable) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
s"${HadoopFsRelation.getClass.getCanonicalName}.")
}
userSpecifiedLocation match {
case Some(location) =>
assert(table.storage.locationUri.get === location)
assert(r.options("path") === location)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the location uri too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall lgtm. I can followup with a pr to remove the parquet and orc reader hacks after this goes in.

// from hive.
logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " +
"different from the schema when this table was created by Spark SQL" +
s"(${schemaFromTableProps.simpleString}). We have to trust the table schema from Hive " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: clarify that we are falling back to the hive metastore schema. Trusting it sounds a little ambiguous.

@SparkQA
Copy link

SparkQA commented Nov 5, 2016

Test build #68176 has finished for PR 14750 at commit c057d0c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 5, 2016

Test build #68181 has finished for PR 14750 at commit 3bd9362.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Nov 5, 2016

Merging in master/branch-2.1. Thanks.

asfgit pushed a commit that referenced this pull request Nov 5, 2016
…ema to table properties like data source table

## What changes were proposed in this pull request?

For data source tables, we will put its table schema, partition columns, etc. to table properties, to work around some hive metastore issues, e.g. not case-preserving, bad decimal type support, etc.

We should also do this for hive serde tables, to reduce the difference between hive serde tables and data source tables, e.g. column names should be case preserving.
## How was this patch tested?

existing tests, and a new test in `HiveExternalCatalog`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #14750 from cloud-fan/minor1.

(cherry picked from commit 95ec4e2)
Signed-off-by: Reynold Xin <rxin@databricks.com>
@asfgit asfgit closed this in 95ec4e2 Nov 5, 2016
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…d to answer a query

(This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.)

## What changes were proposed in this pull request?

In a new Spark session, when a partitioned Hive table is converted to use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference.

If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild.

In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the `HiveMetastoreCatalog` table cache to represent a significant portion of the driver's heap space.

This PR proposes an alternative approach. Basically, it makes four changes:

1. It adds a new method, `listPartitionsByFilter` to the Catalyst `ExternalCatalog` trait which returns the partition metadata for a given sequence of partition pruning predicates.
1. It refactors the `FileCatalog` type hierarchy to include a new `TableFileCatalog` to efficiently return files only for partitions matching a sequence of partition pruning predicates.
1. It removes partition loading and caching from `HiveMetastoreCatalog`.
1. It adds a new Catalyst optimizer rule, `PruneFileSourcePartitions`, which applies a plan's partition-pruning predicates to prune out unnecessary partition files from a `HadoopFsRelation`'s underlying file catalog.

The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. As part of this operation, the `HiveMetastoreCatalog` builds a `HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition metadata or scan any files. The optimizer prunes-away unnecessary table partitions by sending the partition-pruning predicates to the relation's `TableFileCatalog `. The `TableFileCatalog` in turn calls the `listPartitionsByFilter` method on its external catalog. This queries the Hive metastore, passing along those filters.

As a bonus, performing partition pruning during optimization leads to a more accurate relation size estimate. This, along with c481bdf, can lead to automatic, safe application of the broadcast optimization in a join where it might previously have been omitted.

## Open Issues

1. This PR omits partition metadata caching. I can add this once the overall strategy for the cold path is established, perhaps in a future PR.
1. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. This issue may be fixed by apache#14750, but that PR appears to have stalled. ericl has contributed to this PR a workaround for Parquet wherein schema reconciliation occurs at query execution time instead of planning. Whether ORC requires a similar patch is an open issue.
1. This PR omits an implementation of `listPartitionsByFilter` for the `InMemoryCatalog`.
1. This PR breaks parquet log output redirection during query execution. I can work around this by running `Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")` first thing in a Spark shell session, but I haven't figured out how to fix this properly.

## How was this patch tested?

The current Spark unit tests were run, and some ad-hoc tests were performed to validate that only the necessary partition metadata is loaded.

Author: Michael Allman <michael@videoamp.com>
Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes apache#14690 from mallman/spark-16980-lazy_partition_fetching.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…ema to table properties like data source table

## What changes were proposed in this pull request?

For data source tables, we will put its table schema, partition columns, etc. to table properties, to work around some hive metastore issues, e.g. not case-preserving, bad decimal type support, etc.

We should also do this for hive serde tables, to reduce the difference between hive serde tables and data source tables, e.g. column names should be case preserving.
## How was this patch tested?

existing tests, and a new test in `HiveExternalCatalog`

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#14750 from cloud-fan/minor1.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants