-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-17183][SPARK-17983][SPARK-18101][SQL] put hive serde table schema to table properties like data source table #14750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #64202 has finished for PR 14750 at commit
|
Test build #64216 has finished for PR 14750 at commit
|
bbcefcd
to
4f3dfa7
Compare
Test build #64251 has finished for PR 14750 at commit
|
Test build #64252 has finished for PR 14750 at commit
|
Test build #64409 has finished for PR 14750 at commit
|
Test build #64416 has finished for PR 14750 at commit
|
Test build #64421 has finished for PR 14750 at commit
|
// 2. Check if this table is hive compatible | ||
// 2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty | ||
// before save table metadata to Hive. | ||
// 2.1 If it's hive compatible, set serde information in table metadata and try to save |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2.1
-> 2.2
Test build #64656 has finished for PR 14750 at commit
|
Test build #64655 has finished for PR 14750 at commit
|
Test build #64661 has finished for PR 14750 at commit
|
retest this please |
Test build #64691 has finished for PR 14750 at commit
|
Test build #64980 has finished for PR 14750 at commit
|
Test build #64998 has finished for PR 14750 at commit
|
We still need to resolve the case sensitivity issues in To reproduce the issue, below is the SQL
Below is the error we got:
|
Test build #65040 has finished for PR 14750 at commit
|
// Hive metastore may change the table schema, e.g. schema inference. If the table | ||
// schema we read back is different from the one in table properties which was written | ||
// when creating table, we should respect the table schema from hive. | ||
table.copy(properties = getOriginalTableProperties(table)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we still restore the original mixed case column names when possible?
…d to answer a query (This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.) ## What changes were proposed in this pull request? In a new Spark session, when a partitioned Hive table is converted to use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference. If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild. In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the `HiveMetastoreCatalog` table cache to represent a significant portion of the driver's heap space. This PR proposes an alternative approach. Basically, it makes four changes: 1. It adds a new method, `listPartitionsByFilter` to the Catalyst `ExternalCatalog` trait which returns the partition metadata for a given sequence of partition pruning predicates. 1. It refactors the `FileCatalog` type hierarchy to include a new `TableFileCatalog` to efficiently return files only for partitions matching a sequence of partition pruning predicates. 1. It removes partition loading and caching from `HiveMetastoreCatalog`. 1. It adds a new Catalyst optimizer rule, `PruneFileSourcePartitions`, which applies a plan's partition-pruning predicates to prune out unnecessary partition files from a `HadoopFsRelation`'s underlying file catalog. The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. As part of this operation, the `HiveMetastoreCatalog` builds a `HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition metadata or scan any files. The optimizer prunes-away unnecessary table partitions by sending the partition-pruning predicates to the relation's `TableFileCatalog `. The `TableFileCatalog` in turn calls the `listPartitionsByFilter` method on its external catalog. This queries the Hive metastore, passing along those filters. As a bonus, performing partition pruning during optimization leads to a more accurate relation size estimate. This, along with c481bdf, can lead to automatic, safe application of the broadcast optimization in a join where it might previously have been omitted. ## Open Issues 1. This PR omits partition metadata caching. I can add this once the overall strategy for the cold path is established, perhaps in a future PR. 1. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. This issue may be fixed by apache#14750, but that PR appears to have stalled. ericl has contributed to this PR a workaround for Parquet wherein schema reconciliation occurs at query execution time instead of planning. Whether ORC requires a similar patch is an open issue. 1. This PR omits an implementation of `listPartitionsByFilter` for the `InMemoryCatalog`. 1. This PR breaks parquet log output redirection during query execution. I can work around this by running `Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")` first thing in a Spark shell session, but I haven't figured out how to fix this properly. ## How was this patch tested? The current Spark unit tests were run, and some ad-hoc tests were performed to validate that only the necessary partition metadata is loaded. Author: Michael Allman <michael@videoamp.com> Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekhliang@gmail.com> Closes apache#14690 from mallman/spark-16980-lazy_partition_fetching.
Test build #68070 has finished for PR 14750 at commit
|
Test build #68071 has finished for PR 14750 at commit
|
// If this is an external data source table... | ||
if (tableDefinition.tableType == EXTERNAL && | ||
if (tableDefinition.provider.get != "hive" && tableDefinition.tableType == EXTERNAL && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should avoid calling this method for hive table in the try-catch
private def requireTableExists(db: String, table: String): Unit = { | ||
withClient { getTable(db, table) } | ||
/** | ||
* Get the raw table metadata from hive metastore directly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also define raw
at here.
val message = s"Persisting Hive serde table $qualifiedTableName into Hive metastore." | ||
val tableWithDataSourceProps = tableDefinition.copy( | ||
properties = tableDefinition.properties ++ tableProperties) | ||
(Some(tableWithDataSourceProps), message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comments at here.
// If it's a managed table and we are renaming it, then the path option becomes inaccurate | ||
// and we need to update it according to the new table name. | ||
val hasPathOption = new CaseInsensitiveMap(rawTable.storage.properties).contains("path") | ||
val storageWithNewPath = if (rawTable.tableType == MANAGED && hasPathOption) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also explain why we are not changing locationUri at here (need to mention that Hive will change it inside alterTable, line https://github.com/apache/spark/pull/14750/files#diff-159191585e10542f013cb3a714f26075R448).
// field. We should respect the old `locationUri` even it's None. | ||
val oldLocation = getLocationFromStorageProps(oldTableDef) | ||
if (oldLocation == newLocation) { | ||
storageWithPathOption.copy(locationUri = oldTableDef.storage.locationUri) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use an example to explain what we are doing at here.
@@ -537,22 +559,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |||
table | |||
} else { | |||
getProviderFromTableProperties(table).map { provider => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we need to make this part more reader friendly. Maybe ?
getProviderFromTableProperties(table) match {
case None => // regular hive tables
case Some("hive") => // hive serde table created by spark 2.1 or higher version
case Some(other) => // data source table
}
@@ -620,7 +667,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |||
|
|||
val orderedPartitionSpec = new util.LinkedHashMap[String, String]() | |||
getTable(db, table).partitionColumnNames.foreach { colName => | |||
orderedPartitionSpec.put(colName, partition(colName)) | |||
// Lowercase the partition column names before passing the partition spec to Hive client, as | |||
// Hive metastore is not case preserving. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's mention that Hive will lower case the part column names. So, if we do not convert the name to its lower case, hive will complain.
Could you also verify that spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala Line 108 in 90d3b91
|
Test build #68117 has finished for PR 14750 at commit
|
Test build #68134 has finished for PR 14750 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this out locally and it works, even with all the parquet / orc readers reverted. Just some questions.
|
||
if (tableDefinition.tableType == VIEW) { | ||
client.createTable(tableDefinition, ignoreIfExists) | ||
} else if (tableDefinition.provider.get == "hive") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we compare against a constant instead of a string literal here?
// will be updated automatically in Hive metastore by the `alterTable` call at the end of this | ||
// method. Here we only update the path option if the path option already exists in storage | ||
// properties, to avoid adding a unnecessary path option for Hive serde tables. | ||
val hasPathOption = new CaseInsensitiveMap(rawTable.storage.properties).contains("path") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change related to this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related, but it's a follow up of https://github.com/apache/spark/pull/15024/files#diff-159191585e10542f013cb3a714f26075R422 , which may add an extra path option to Hive serde tables.
val propsWithoutPath = table.storage.properties.filterKeys(_.toLowerCase != "path") | ||
val propsWithoutPath = table.storage.properties.filter { | ||
case (k, v) => k.toLowerCase != "path" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this equivalent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, the map returned by filterKeys
is not serializable, I'll add some comment
// old `locationUri` even it's None. | ||
val storageWithNewLocation = if (oldLocation == newLocation) { | ||
oldTableDef.storage | ||
val newStorage = if (tableDefinition.provider.get == "hive") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also compare against constant?
// schema we read back is different(ignore case and nullability) from the one in table | ||
// properties which was written when creating table, we should respect the table schema | ||
// from hive. | ||
hiveTable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there cases where this could be issue? Should we log a warning here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea!
if (!isDataSourceTable) { | ||
fail( | ||
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " + | ||
s"${HadoopFsRelation.getClass.getCanonicalName}.") | ||
} | ||
userSpecifiedLocation match { | ||
case Some(location) => | ||
assert(table.storage.locationUri.get === location) | ||
assert(r.options("path") === location) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check the location uri too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's also a follow-up: https://github.com/apache/spark/pull/15024/files#r86273774
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall lgtm. I can followup with a pr to remove the parquet and orc reader hacks after this goes in.
// from hive. | ||
logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " + | ||
"different from the schema when this table was created by Spark SQL" + | ||
s"(${schemaFromTableProps.simpleString}). We have to trust the table schema from Hive " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: clarify that we are falling back to the hive metastore schema. Trusting it sounds a little ambiguous.
Test build #68176 has finished for PR 14750 at commit
|
Test build #68181 has finished for PR 14750 at commit
|
Merging in master/branch-2.1. Thanks. |
…ema to table properties like data source table ## What changes were proposed in this pull request? For data source tables, we will put its table schema, partition columns, etc. to table properties, to work around some hive metastore issues, e.g. not case-preserving, bad decimal type support, etc. We should also do this for hive serde tables, to reduce the difference between hive serde tables and data source tables, e.g. column names should be case preserving. ## How was this patch tested? existing tests, and a new test in `HiveExternalCatalog` Author: Wenchen Fan <wenchen@databricks.com> Closes #14750 from cloud-fan/minor1. (cherry picked from commit 95ec4e2) Signed-off-by: Reynold Xin <rxin@databricks.com>
…d to answer a query (This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.) ## What changes were proposed in this pull request? In a new Spark session, when a partitioned Hive table is converted to use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference. If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild. In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the `HiveMetastoreCatalog` table cache to represent a significant portion of the driver's heap space. This PR proposes an alternative approach. Basically, it makes four changes: 1. It adds a new method, `listPartitionsByFilter` to the Catalyst `ExternalCatalog` trait which returns the partition metadata for a given sequence of partition pruning predicates. 1. It refactors the `FileCatalog` type hierarchy to include a new `TableFileCatalog` to efficiently return files only for partitions matching a sequence of partition pruning predicates. 1. It removes partition loading and caching from `HiveMetastoreCatalog`. 1. It adds a new Catalyst optimizer rule, `PruneFileSourcePartitions`, which applies a plan's partition-pruning predicates to prune out unnecessary partition files from a `HadoopFsRelation`'s underlying file catalog. The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. As part of this operation, the `HiveMetastoreCatalog` builds a `HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition metadata or scan any files. The optimizer prunes-away unnecessary table partitions by sending the partition-pruning predicates to the relation's `TableFileCatalog `. The `TableFileCatalog` in turn calls the `listPartitionsByFilter` method on its external catalog. This queries the Hive metastore, passing along those filters. As a bonus, performing partition pruning during optimization leads to a more accurate relation size estimate. This, along with c481bdf, can lead to automatic, safe application of the broadcast optimization in a join where it might previously have been omitted. ## Open Issues 1. This PR omits partition metadata caching. I can add this once the overall strategy for the cold path is established, perhaps in a future PR. 1. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. This issue may be fixed by apache#14750, but that PR appears to have stalled. ericl has contributed to this PR a workaround for Parquet wherein schema reconciliation occurs at query execution time instead of planning. Whether ORC requires a similar patch is an open issue. 1. This PR omits an implementation of `listPartitionsByFilter` for the `InMemoryCatalog`. 1. This PR breaks parquet log output redirection during query execution. I can work around this by running `Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")` first thing in a Spark shell session, but I haven't figured out how to fix this properly. ## How was this patch tested? The current Spark unit tests were run, and some ad-hoc tests were performed to validate that only the necessary partition metadata is loaded. Author: Michael Allman <michael@videoamp.com> Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekhliang@gmail.com> Closes apache#14690 from mallman/spark-16980-lazy_partition_fetching.
…ema to table properties like data source table ## What changes were proposed in this pull request? For data source tables, we will put its table schema, partition columns, etc. to table properties, to work around some hive metastore issues, e.g. not case-preserving, bad decimal type support, etc. We should also do this for hive serde tables, to reduce the difference between hive serde tables and data source tables, e.g. column names should be case preserving. ## How was this patch tested? existing tests, and a new test in `HiveExternalCatalog` Author: Wenchen Fan <wenchen@databricks.com> Closes apache#14750 from cloud-fan/minor1.
What changes were proposed in this pull request?
For data source tables, we will put its table schema, partition columns, etc. to table properties, to work around some hive metastore issues, e.g. not case-preserving, bad decimal type support, etc.
We should also do this for hive serde tables, to reduce the difference between hive serde tables and data source tables, e.g. column names should be case preserving.
How was this patch tested?
existing tests, and a new test in
HiveExternalCatalog