Skip to content

[SPARK-16980][SQL] Load only catalog table partition metadata required to answer a query #14690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

mallman
Copy link
Contributor

@mallman mallman commented Aug 17, 2016

(This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.)

What changes were proposed in this pull request?

In a new Spark session, when a partitioned Hive table is converted to use Spark's HadoopFsRelation in HiveMetastoreCatalog, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference.

If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild.

In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the HiveMetastoreCatalog table cache to represent a significant portion of the driver's heap space.

This PR proposes an alternative approach. Basically, it makes four changes:

  1. It adds a new method, listPartitionsByFilter to the Catalyst ExternalCatalog trait which returns the partition metadata for a given sequence of partition pruning predicates.
  2. It refactors the FileCatalog type hierarchy to include a new TableFileCatalog to efficiently return files only for partitions matching a sequence of partition pruning predicates.
  3. It removes partition loading and caching from HiveMetastoreCatalog.
  4. It adds a new Catalyst optimizer rule, PruneFileSourcePartitions, which applies a plan's partition-pruning predicates to prune out unnecessary partition files from a HadoopFsRelation's underlying file catalog.

The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from HiveMetastoreCatalog. As part of this operation, the HiveMetastoreCatalog builds a HadoopFsRelation with a TableFileCatalog. It does not load any partition metadata or scan any files. The optimizer prunes-away unnecessary table partitions by sending the partition-pruning predicates to the relation's TableFileCatalog. The TableFileCatalog in turn calls the listPartitionsByFilter method on its external catalog. This queries the Hive metastore, passing along those filters.

As a bonus, performing partition pruning during optimization leads to a more accurate relation size estimate. This, along with c481bdf, can lead to automatic, safe application of the broadcast optimization in a join where it might previously have been omitted.

Open Issues

  1. This PR omits partition metadata caching. I can add this once the overall strategy for the cold path is established, perhaps in a future PR.
  2. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. This issue may be fixed by [SPARK-17183][SPARK-17983][SPARK-18101][SQL] put hive serde table schema to table properties like data source table #14750, but that PR appears to have stalled. @ericl has contributed to this PR a workaround for Parquet wherein schema reconciliation occurs at query execution time instead of planning. Whether ORC requires a similar patch is an open issue.
  3. This PR omits an implementation of listPartitionsByFilter for the InMemoryCatalog.
  4. This PR breaks parquet log output redirection during query execution. I can work around this by running Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$") first thing in a Spark shell session, but I haven't figured out how to fix this properly.

How was this patch tested?

The current Spark unit tests were run, and some ad-hoc tests were performed to validate that only the necessary partition metadata is loaded.

@ericl
Copy link
Contributor

ericl commented Aug 17, 2016

cc @cloud-fan this seems relevant to your work around making Hive work as a datasource.

@SparkQA
Copy link

SparkQA commented Aug 17, 2016

Test build #63931 has finished for PR 14690 at commit 2f96b8a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class SessionFileCatalog(
    • class TableFileCatalog(
    • trait BasicFileCatalog
    • trait FileCatalog extends BasicFileCatalog

@cloud-fan
Copy link
Contributor

If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query.

When we read a partitioned hive table, we will retrieve all partition metadata from hive metastore and load them to driver memory. Yes, it's not so efficient and may blow up the dirver.

However, it only happens at first read, then these data will be cached. If you don't load all partition metadata at first read, how are you going to deal with the cache?

@ericl
Copy link
Contributor

ericl commented Aug 18, 2016

@cloud-fan if an external catalog supports pruning, we shouldn't try to materialize it in memory, since the catalog can probably do a better job of caching and pruning than we can. Currently you can only get this behavior with un-converted hive tables + hive partition pruning flag enabled, which is necessary to get reasonable analysis time for small queries over tables with many partitions.

@mallman
Copy link
Contributor Author

mallman commented Aug 18, 2016

@cloud-fan Actually, all hive table partition metadata are retrieved for every query analysis. This is then used to compare the new metadata with the cached metadata. If they're the same, then we can use the cached table. However, if they differ, then all of the partition files are loaded, the schema is inferred, a new HadoopFsRelation is created and cached.

@mallman
Copy link
Contributor Author

mallman commented Aug 18, 2016

I've been thinking some more about the metastore/file schema reconciliation. As I mentioned in the PR description, this patch omits this reconciliation. This causes failures when the parquet files have columns with upper case characters, among other cases.

I mentioned trying to add a new schema method to the relation or its file catalog which would read the file schema for only a subset of the partitions. However, I quickly determined that would require a fundamental change in the way BaseRelation instances are handled throughout the codebase and decided that was a nonstarter.

I have another idea I'd like your thoughts on. Rather than defer partition pruning to physical planning, what if we did that in the optimization phase and performed the hive MetastoreRelation conversion in an optimization rule? This rule would build a HadoopFsRelation based on the partition pruning predicates. We could add it to a batch after all of the pruning predicates have been pushed down and set it to run once. In that kind of scenario, I believe we could leave the HadoopFsRelation and FileCatalog types as they are in the current codebase. We'd just be building a FileCatalog restricted to the partitions we identify in the optimization phase. Physical planning would proceed as usual.

In this approach, we should be able to do "just in time" schema reconciliation/merging based only on the selected partitions. And we could address an issue I've long been frustrated by—the fact that a BaseRelation's size is defined irregardless of the partitions we're using. By creating a HadoopFsRelation restricted to the partitions we care about, we can compute a precise size for the relation tailored to our query. This would greatly help the automatic broadcast join conversion heuristic.

Thoughts? I may start fiddling around with this when I have time today. It's possible I'm missing a crucial detail that makes this approach impractical.

@ericl
Copy link
Contributor

ericl commented Aug 18, 2016

I was thinking that the table schema should be provided by the catalog (either FileCatalog or ExternalCatalog?), so that it would be consistent for the whole table and yet not require a full file scan (since the catalog would presumably maintain the full table schema internally). Not sure how easy this is to pull off with the current HiveMetastoreCatalog though.

I don't think we can allow schemas to vary based on the selected partitions. That can be confusing, since certain fields could then appear/disappear with different predicates provided.

@mallman
Copy link
Contributor Author

mallman commented Aug 18, 2016

@ericl That would be ideal, however the Hive metastore does not faithfully record column names with upper case characters. So if you save a parquet file with a column named myCol and define an external Hive table over it with a column named myCol Hive will not respect the case sensitive column name. When reading the schema from the metastore the column name will be mycol and Spark will fail to find that column from the parquet file.

@ericl
Copy link
Contributor

ericl commented Aug 18, 2016

Isn't case sensitivity disabled by default in spark?

@mallman
Copy link
Contributor Author

mallman commented Aug 18, 2016

It may be, but we at least need to get the unit tests working. And they use mixed case column names. :)

@cloud-fan
Copy link
Contributor

So if you save a parquet file with a column named myCol and define an external Hive table over it with a column named myCol Hive will not respect the case sensitive column name.

This will be fixed soon(maybe within a week), using the same approach for data source tables.

@mallman
Copy link
Contributor Author

mallman commented Aug 19, 2016

@cloud-fan Ooooo... how exciting! Is there a PR?

@cloud-fan
Copy link
Contributor

not yet, but I can make one right after #14155 is merged

@mallman
Copy link
Contributor Author

mallman commented Aug 19, 2016

That looks great!

@mallman
Copy link
Contributor Author

mallman commented Aug 31, 2016

@cloud-fan Now that #14155 is merged, have you started on a follow up PR to address the column name case-insensitivity issue? I've rebased and done some more work on this PR, but I'd like to examine your changes to make sure they're compatible.

@cloud-fan
Copy link
Contributor

yea, there it is #14750

@mallman
Copy link
Contributor Author

mallman commented Sep 1, 2016

Thanks, that looks great.

@mallman mallman force-pushed the spark-16980-lazy_partition_fetching branch from de59d5f to 920a180 Compare September 2, 2016 00:01
@SparkQA
Copy link

SparkQA commented Sep 2, 2016

Test build #64817 has finished for PR 14690 at commit 920a180.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor Author

mallman commented Sep 2, 2016

I've rebased and pushed a new commit which prunes a HadoopFsRelation's TableFileCatalog to a ListingFileCatalog for a given set of partition pruning expressions. Another upside of this change is that HadoopFsRelation.sizeInBytes now returns the size of the requested data partitions instead of the size of the entire table.

I'll do another rebase when #14750 has been merged.

@SparkQA
Copy link

SparkQA commented Sep 2, 2016

Test build #64815 has finished for PR 14690 at commit de59d5f.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@@ -184,7 +184,7 @@ case class FileSourceScanExec(
"Batched" -> supportsBatch.toString,
"PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
"PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
"InputPaths" -> relation.location.paths.mkString(", "))
"RootPaths" -> relation.location.rootPaths.mkString(", "))
Copy link
Contributor

@ericl ericl Sep 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure the physical plan still has a good debug string when you call explain (i.e. tells which catalog it's using) since that will greatly impact performance in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure what you're asking for here. If I replace "RootPaths" with an entry

"Location" -> relation.location

and implement appropriate toString methods for the relation.location classes, does that satisfy your concern?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think mainly you want to be able to differentiate between a metastore-backed and pure file-catalog relation.

@mallman
Copy link
Contributor Author

mallman commented Sep 3, 2016

Thanks @ericl and @davies for your latest feedback. I'd like to take the opportunity to rebase this PR off of #14750 after it's merged to master before pushing another commit, however I understand the current feeling is to revert partition pruning out of planning. I'd like to get your thoughts on proposing putting pruning into planning in a different PR given the viewpoint I expressed.

@ericl
Copy link
Contributor

ericl commented Sep 3, 2016

Yeah, I wouldn't rule out making that change later since it can get us significantly better planning, but I also think we should find a better way of doing that than making pruned copies of HadoopFsRelation.

ericl added 2 commits October 14, 2016 14:40
* [SPARK-16980][SQL] Load only catalog table partition metadata required
to answer a query

* Add a new catalyst optimizer rule to SQL core for pruning unneeded
partitions' files from a table file catalog

* Include the type of file catalog in the FileSourceScanExec metadata

* TODO: Consider renaming FileCatalog to better differentiate it from
BasicFileCatalog (or vice-versa)

* try out parquet case insensitive fallback

* Refactor the FileSourceScanExec.metadata val to make it prettier

* fix and add test for input files

* rename test

* Refactor `TableFileCatalog.listFiles` to call `listDataLeafFiles` once
instead of once per partition

* fix it

* more test cases

* also fix a bug with zero partitions selected

* feature flag

* add comments

* extend and fix flakiness in test

* Enhance `ParquetMetastoreSuite` with mixed-case partition columns

* Tidy up a little by removing some unused imports, an unused method and
moving a protected method down and making it private

* Put partition count in `FileSourceScanExec.metadata` for partitioned
tables

* Fix some errors in my revision of `ParquetSourceSuite`

* Thu Oct 13 17:18:14 PDT 2016

* more generic

* Thu Oct 13 18:09:42 PDT 2016

* Thu Oct 13 18:09:55 PDT 2016

* Thu Oct 13 18:22:31 PDT 2016
)

* Thu Oct 13 19:02:36 PDT 2016

* Thu Oct 13 19:03:06 PDT 2016
@ericl
Copy link
Contributor

ericl commented Oct 14, 2016

My main concern is that, the new FileCatalog class hierarchy is really complex now. Can we simplify it a little? or at least let's put a class hierarchy chart(or text explanation) in PR description to make other reviewers easier to understand this new hierarchy.

One low-hanging fruit here is to unify SessionFileCatalog with PartitionAwareFileCatalog. It seems that after some of the changes here they no longer need to be separate. This is a PR I can add on after the rebase is finished.

I also think we should rename some of the classes, but since this would be an invasive change we can handle it in a follow-up.

@ericl
Copy link
Contributor

ericl commented Oct 14, 2016

btw, what's the parquet log redirection issue? I don't see anything unusual in spark shell.

@rxin
Copy link
Contributor

rxin commented Oct 14, 2016

We can also merge this first and then do clean-ups next week BTW. Otherwise this pull request will continue getting conflicts everyday. I'd prioritize getting this to a mergeable state and then merge it asap.

* Fri Oct 14 14:04:01 PDT 2016

* stray println
@mallman mallman force-pushed the spark-16980-lazy_partition_fetching branch from 5a5036b to 014c998 Compare October 14, 2016 22:04
@mallman
Copy link
Contributor Author

mallman commented Oct 14, 2016

I just pushed the rebase. It was really hairy, but I tried hard to ensure I got essentially all three branches' changes in.

@mallman
Copy link
Contributor Author

mallman commented Oct 14, 2016

btw, what's the parquet log redirection issue? I don't see anything unusual in spark shell.

Whenever I run a query on a Hive parquet table I get

spark-sql> select eid from event.bid_request where ds=20160915 limit 10;
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".                
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Oct 14, 2016 10:07:03 PM WARNING: org.apache.parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr version 1.6.0
org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) )?\(build ?(.*)\)
    at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
    at org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60)
    at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263)
    at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:567)
    at org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:544)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:431)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:386)
    at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:107)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:368)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:342)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:149)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:372)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Oct 14, 2016 10:07:03 PM WARNING: org.apache.parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr version 1.6.0
org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) )?\(build ?(.*)\)
    at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
    at org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60)
    at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263)
    at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:567)
...

This is really just a small fraction of the lines of these warnings I get.

I looked into this a couple months back. The underlying problem is detailed in https://issues.apache.org/jira/browse/PARQUET-349. A fix has been merged into their codebase, but it's not in a release yet.

@ericl
Copy link
Contributor

ericl commented Oct 14, 2016

Hm, I haven't seen that with my test queries. Would adding your workaround to SparkILoopInit help?

@mallman
Copy link
Contributor Author

mallman commented Oct 14, 2016

Hm, I haven't seen that with my test queries. Would adding your workaround to SparkILoopInit work?

It does not, unfortunately.

@mallman
Copy link
Contributor Author

mallman commented Oct 14, 2016

Hm, I haven't seen that with my test queries. Would adding your workaround to SparkILoopInit work?

It does not, unfortunately.

I believe this impacts people with parquet files written from an older version of Spark using an older version of the parquet-mr libraries. We're using parquet-mr 1.7.0 in production. It's apparently not writing the "build" part of the version string that the 1.8.1 parquet reader is looking for.

I'll work on fixing this, but unless I can get it done really quickly I probably won't make progress today. I just have too many things on my plate right now.

Cheers.

@SparkQA
Copy link

SparkQA commented Oct 15, 2016

Test build #66992 has finished for PR 14690 at commit 014c998.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Oct 15, 2016

Thanks a lot -- going to merge this. We should continue clean it up as @cloud-fan suggested.

@asfgit asfgit closed this in 6ce1b67 Oct 15, 2016
@ericl
Copy link
Contributor

ericl commented Oct 15, 2016

Fyi, I'm working on some of the mentioned refactoring and adding back
metadata caching. Will file JIRAs for those when I get a chance.

On Fri, Oct 14, 2016, 6:29 PM asfgit notifications@github.com wrote:

Closed #14690 #14690 via 6ce1b67
6ce1b67
.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#14690 (comment), or mute the
thread
https://github.com/notifications/unsubscribe-auth/AAA6SujG8IJQxngD84rXGBiKNJE-bUFLks5q0CxogaJpZM4Jmu92
.

@mallman mallman deleted the spark-16980-lazy_partition_fetching branch October 18, 2016 22:46
ghost pushed a commit to dbtsai/spark that referenced this pull request Oct 19, 2016
## What changes were proposed in this pull request?

There was a bug introduced in apache#14690 which broke refreshByPath with converted hive tables (though, it turns out it was very difficult to refresh converted hive tables anyways, since you had to specify the exact path of one of the partitions).

This changes refreshByPath to invalidate by prefix instead of exact match, and fixes the issue.

cc sameeragarwal for refreshByPath changes
mallman

## How was this patch tested?

Extended unit test.

Author: Eric Liang <ekl@databricks.com>

Closes apache#15521 from ericl/fix-caching.
robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
…d to answer a query

(This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.)

## What changes were proposed in this pull request?

In a new Spark session, when a partitioned Hive table is converted to use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference.

If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild.

In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the `HiveMetastoreCatalog` table cache to represent a significant portion of the driver's heap space.

This PR proposes an alternative approach. Basically, it makes four changes:

1. It adds a new method, `listPartitionsByFilter` to the Catalyst `ExternalCatalog` trait which returns the partition metadata for a given sequence of partition pruning predicates.
1. It refactors the `FileCatalog` type hierarchy to include a new `TableFileCatalog` to efficiently return files only for partitions matching a sequence of partition pruning predicates.
1. It removes partition loading and caching from `HiveMetastoreCatalog`.
1. It adds a new Catalyst optimizer rule, `PruneFileSourcePartitions`, which applies a plan's partition-pruning predicates to prune out unnecessary partition files from a `HadoopFsRelation`'s underlying file catalog.

The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. As part of this operation, the `HiveMetastoreCatalog` builds a `HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition metadata or scan any files. The optimizer prunes-away unnecessary table partitions by sending the partition-pruning predicates to the relation's `TableFileCatalog `. The `TableFileCatalog` in turn calls the `listPartitionsByFilter` method on its external catalog. This queries the Hive metastore, passing along those filters.

As a bonus, performing partition pruning during optimization leads to a more accurate relation size estimate. This, along with c481bdf, can lead to automatic, safe application of the broadcast optimization in a join where it might previously have been omitted.

## Open Issues

1. This PR omits partition metadata caching. I can add this once the overall strategy for the cold path is established, perhaps in a future PR.
1. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. This issue may be fixed by apache#14750, but that PR appears to have stalled. ericl has contributed to this PR a workaround for Parquet wherein schema reconciliation occurs at query execution time instead of planning. Whether ORC requires a similar patch is an open issue.
1. This PR omits an implementation of `listPartitionsByFilter` for the `InMemoryCatalog`.
1. This PR breaks parquet log output redirection during query execution. I can work around this by running `Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")` first thing in a Spark shell session, but I haven't figured out how to fix this properly.

## How was this patch tested?

The current Spark unit tests were run, and some ad-hoc tests were performed to validate that only the necessary partition metadata is loaded.

Author: Michael Allman <michael@videoamp.com>
Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes apache#14690 from mallman/spark-16980-lazy_partition_fetching.
robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
## What changes were proposed in this pull request?

There was a bug introduced in apache#14690 which broke refreshByPath with converted hive tables (though, it turns out it was very difficult to refresh converted hive tables anyways, since you had to specify the exact path of one of the partitions).

This changes refreshByPath to invalidate by prefix instead of exact match, and fixes the issue.

cc sameeragarwal for refreshByPath changes
mallman

## How was this patch tested?

Extended unit test.

Author: Eric Liang <ekl@databricks.com>

Closes apache#15521 from ericl/fix-caching.
asfgit pushed a commit that referenced this pull request Nov 10, 2016
(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17993)
## What changes were proposed in this pull request?

PR #14690 broke parquet log output redirection for converted partitioned Hive tables. For example, when querying parquet files written by Parquet-mr 1.6.0 Spark prints a torrent of (harmless) warning messages from the Parquet reader:

```
Oct 18, 2016 7:42:18 PM WARNING: org.apache.parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr version 1.6.0
org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) )?\(build ?(.*)\)
    at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
    at org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60)
    at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263)
    at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:583)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:513)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:270)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:225)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:162)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:372)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
```

This only happens during execution, not planning, and it doesn't matter what log level the `SparkContext` is set to. That's because Parquet (versions < 1.9) doesn't use slf4j for logging. Note, you can tell that log redirection is not working here because the log message format does not conform to the default Spark log message format.

This is a regression I noted as something we needed to fix as a follow up.

It appears that the problem arose because we removed the call to `inferSchema` during Hive table conversion. That call is what triggered the output redirection.

## How was this patch tested?

I tested this manually in four ways:
1. Executing `spark.sqlContext.range(10).selectExpr("id as a").write.mode("overwrite").parquet("test")`.
2. Executing `spark.read.format("parquet").load(legacyParquetFile).show` for a Parquet file `legacyParquetFile` written using Parquet-mr 1.6.0.
3. Executing `select * from legacy_parquet_table limit 1` for some unpartitioned Parquet-based Hive table written using Parquet-mr 1.6.0.
4. Executing `select * from legacy_partitioned_parquet_table where partcol=x limit 1` for some partitioned Parquet-based Hive table written using Parquet-mr 1.6.0.

I ran each test with a new instance of `spark-shell` or `spark-sql`.

Incidentally, I found that test case 3 was not a regression—redirection was not occurring in the master codebase prior to #14690.

I spent some time working on a unit test, but based on my experience working on this ticket I feel that automated testing here is far from feasible.

cc ericl dongjoon-hyun

Author: Michael Allman <michael@videoamp.com>

Closes #15538 from mallman/spark-17993-fix_parquet_log_redirection.

(cherry picked from commit b533fa2)
Signed-off-by: Reynold Xin <rxin@databricks.com>
asfgit pushed a commit that referenced this pull request Nov 10, 2016
(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17993)
## What changes were proposed in this pull request?

PR #14690 broke parquet log output redirection for converted partitioned Hive tables. For example, when querying parquet files written by Parquet-mr 1.6.0 Spark prints a torrent of (harmless) warning messages from the Parquet reader:

```
Oct 18, 2016 7:42:18 PM WARNING: org.apache.parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr version 1.6.0
org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) )?\(build ?(.*)\)
    at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
    at org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60)
    at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263)
    at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:583)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:513)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:270)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:225)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:162)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:372)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
```

This only happens during execution, not planning, and it doesn't matter what log level the `SparkContext` is set to. That's because Parquet (versions < 1.9) doesn't use slf4j for logging. Note, you can tell that log redirection is not working here because the log message format does not conform to the default Spark log message format.

This is a regression I noted as something we needed to fix as a follow up.

It appears that the problem arose because we removed the call to `inferSchema` during Hive table conversion. That call is what triggered the output redirection.

## How was this patch tested?

I tested this manually in four ways:
1. Executing `spark.sqlContext.range(10).selectExpr("id as a").write.mode("overwrite").parquet("test")`.
2. Executing `spark.read.format("parquet").load(legacyParquetFile).show` for a Parquet file `legacyParquetFile` written using Parquet-mr 1.6.0.
3. Executing `select * from legacy_parquet_table limit 1` for some unpartitioned Parquet-based Hive table written using Parquet-mr 1.6.0.
4. Executing `select * from legacy_partitioned_parquet_table where partcol=x limit 1` for some partitioned Parquet-based Hive table written using Parquet-mr 1.6.0.

I ran each test with a new instance of `spark-shell` or `spark-sql`.

Incidentally, I found that test case 3 was not a regression—redirection was not occurring in the master codebase prior to #14690.

I spent some time working on a unit test, but based on my experience working on this ticket I feel that automated testing here is far from feasible.

cc ericl dongjoon-hyun

Author: Michael Allman <michael@videoamp.com>

Closes #15538 from mallman/spark-17993-fix_parquet_log_redirection.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…d to answer a query

(This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.)

## What changes were proposed in this pull request?

In a new Spark session, when a partitioned Hive table is converted to use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference.

If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild.

In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the `HiveMetastoreCatalog` table cache to represent a significant portion of the driver's heap space.

This PR proposes an alternative approach. Basically, it makes four changes:

1. It adds a new method, `listPartitionsByFilter` to the Catalyst `ExternalCatalog` trait which returns the partition metadata for a given sequence of partition pruning predicates.
1. It refactors the `FileCatalog` type hierarchy to include a new `TableFileCatalog` to efficiently return files only for partitions matching a sequence of partition pruning predicates.
1. It removes partition loading and caching from `HiveMetastoreCatalog`.
1. It adds a new Catalyst optimizer rule, `PruneFileSourcePartitions`, which applies a plan's partition-pruning predicates to prune out unnecessary partition files from a `HadoopFsRelation`'s underlying file catalog.

The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. As part of this operation, the `HiveMetastoreCatalog` builds a `HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition metadata or scan any files. The optimizer prunes-away unnecessary table partitions by sending the partition-pruning predicates to the relation's `TableFileCatalog `. The `TableFileCatalog` in turn calls the `listPartitionsByFilter` method on its external catalog. This queries the Hive metastore, passing along those filters.

As a bonus, performing partition pruning during optimization leads to a more accurate relation size estimate. This, along with c481bdf, can lead to automatic, safe application of the broadcast optimization in a join where it might previously have been omitted.

## Open Issues

1. This PR omits partition metadata caching. I can add this once the overall strategy for the cold path is established, perhaps in a future PR.
1. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. This issue may be fixed by apache#14750, but that PR appears to have stalled. ericl has contributed to this PR a workaround for Parquet wherein schema reconciliation occurs at query execution time instead of planning. Whether ORC requires a similar patch is an open issue.
1. This PR omits an implementation of `listPartitionsByFilter` for the `InMemoryCatalog`.
1. This PR breaks parquet log output redirection during query execution. I can work around this by running `Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")` first thing in a Spark shell session, but I haven't figured out how to fix this properly.

## How was this patch tested?

The current Spark unit tests were run, and some ad-hoc tests were performed to validate that only the necessary partition metadata is loaded.

Author: Michael Allman <michael@videoamp.com>
Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes apache#14690 from mallman/spark-16980-lazy_partition_fetching.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

There was a bug introduced in apache#14690 which broke refreshByPath with converted hive tables (though, it turns out it was very difficult to refresh converted hive tables anyways, since you had to specify the exact path of one of the partitions).

This changes refreshByPath to invalidate by prefix instead of exact match, and fixes the issue.

cc sameeragarwal for refreshByPath changes
mallman

## How was this patch tested?

Extended unit test.

Author: Eric Liang <ekl@databricks.com>

Closes apache#15521 from ericl/fix-caching.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17993)
## What changes were proposed in this pull request?

PR apache#14690 broke parquet log output redirection for converted partitioned Hive tables. For example, when querying parquet files written by Parquet-mr 1.6.0 Spark prints a torrent of (harmless) warning messages from the Parquet reader:

```
Oct 18, 2016 7:42:18 PM WARNING: org.apache.parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr version 1.6.0
org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) )?\(build ?(.*)\)
    at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
    at org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60)
    at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263)
    at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:583)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:513)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:270)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:225)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:162)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:372)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
```

This only happens during execution, not planning, and it doesn't matter what log level the `SparkContext` is set to. That's because Parquet (versions < 1.9) doesn't use slf4j for logging. Note, you can tell that log redirection is not working here because the log message format does not conform to the default Spark log message format.

This is a regression I noted as something we needed to fix as a follow up.

It appears that the problem arose because we removed the call to `inferSchema` during Hive table conversion. That call is what triggered the output redirection.

## How was this patch tested?

I tested this manually in four ways:
1. Executing `spark.sqlContext.range(10).selectExpr("id as a").write.mode("overwrite").parquet("test")`.
2. Executing `spark.read.format("parquet").load(legacyParquetFile).show` for a Parquet file `legacyParquetFile` written using Parquet-mr 1.6.0.
3. Executing `select * from legacy_parquet_table limit 1` for some unpartitioned Parquet-based Hive table written using Parquet-mr 1.6.0.
4. Executing `select * from legacy_partitioned_parquet_table where partcol=x limit 1` for some partitioned Parquet-based Hive table written using Parquet-mr 1.6.0.

I ran each test with a new instance of `spark-shell` or `spark-sql`.

Incidentally, I found that test case 3 was not a regression—redirection was not occurring in the master codebase prior to apache#14690.

I spent some time working on a unit test, but based on my experience working on this ticket I feel that automated testing here is far from feasible.

cc ericl dongjoon-hyun

Author: Michael Allman <michael@videoamp.com>

Closes apache#15538 from mallman/spark-17993-fix_parquet_log_redirection.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants