[SPARK-21056][SQL] Use at most one spark job to list files in InMemoryFileIndex#18269
[SPARK-21056][SQL] Use at most one spark job to list files in InMemoryFileIndex#18269bbossy wants to merge 5 commits intoapache:masterfrom
Conversation
|
Can one of the admins verify this patch? |
There was a problem hiding this comment.
Would it be safe to use the same instance of fs for all the paths in a InMemoryFileIndex? If this is the case, I can move this back to where it was before.
There was a problem hiding this comment.
Yes, it's the same instance and should be reused
|
Could you please update the PR description by copying the contents from the JIRA? Any performance number you can share? |
|
I ran a synthetic scenario to show what changes, since deploying this branch would be more involved. I created two very simple relations on a small HDFS cluster (4 datanodes). Running spark with master Setup:Using master branch before my commits:Using this PR:Is there something more specific that I should look into? |
|
ping @gatorsmile @srowen and possibly @cloud-fan : Would like to hear your thoughts on this. |
There was a problem hiding this comment.
how about
if (paths.isEmpty) {
Nil
} else {
val fs = paths.head.getFileSystem(hadoopConf)
......
}
There was a problem hiding this comment.
can we merge these flatMaps?
There was a problem hiding this comment.
something like
paths.flatMap { path =>
try {
val status = fs.get.listStatus(path)
val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
....
} catch ...
}
There was a problem hiding this comment.
then we can still keep the previous code structure
There was a problem hiding this comment.
nit: style issue. this if-then-else should be moved to left with 2 spaces
|
@bbossy I've built and deployed a branch of Spark 2.2 with your patch and compared its behavior to the same branch of Spark 2.2 without your patch. I'm seeing different behavior, but not what I expected. My test table has three partition columns, I use |
|
@mallman I'm not sure where this difference in behaviour is coming from. The following test in Does it match your scenario? I'll dig around a bit later to see if I can come up with an explanation. |
|
@cloud-fan Could you take another look, please? |
| try { | ||
| val fStatuses = fs.listStatus(path) | ||
| val filtered = fStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)) | ||
| if (filtered.nonEmpty) { |
There was a problem hiding this comment.
nit: filtered.map(path -> _), so that we don't need the if-else here, and the flatMap there
| fStatuses.map { f => path -> f } | ||
| }.partition { case (_, fStatus) => fStatus.isDirectory } | ||
| val pathsToList = dirs.map { case (_, fStatus) => fStatus.getPath } | ||
| val nestedFiles = if (pathsToList.nonEmpty) { |
There was a problem hiding this comment.
do we need this if check?
|
let's wait @mallman 's response to make sure this patch does fix the problem |
|
Hi @bbossy
It does not match my scenario. I'm reading files from HDFS. In your test, you're reading files from the local filesystem. Can you try a test using files stored in HDFS? Also, I'm not testing with |
|
gentle ping @bbossy, I just want to be sure if it is in progress in any way. |
|
@HyukjinKwon I'll see that I can address the outstanding review comments in the next day or two. |
## What changes were proposed in this pull request? This PR proposes to close stale PRs, mostly the same instances with apache#18017 Closes apache#14085 - [SPARK-16408][SQL] SparkSQL Added file get Exception: is a directory … Closes apache#14239 - [SPARK-16593] [CORE] [WIP] Provide a pre-fetch mechanism to accelerate shuffle stage. Closes apache#14567 - [SPARK-16992][PYSPARK] Python Pep8 formatting and import reorganisation Closes apache#14579 - [SPARK-16921][PYSPARK] RDD/DataFrame persist()/cache() should return Python context managers Closes apache#14601 - [SPARK-13979][Core] Killed executor is re spawned without AWS key… Closes apache#14830 - [SPARK-16992][PYSPARK][DOCS] import sort and autopep8 on Pyspark examples Closes apache#14963 - [SPARK-16992][PYSPARK] Virtualenv for Pylint and pep8 in lint-python Closes apache#15227 - [SPARK-17655][SQL]Remove unused variables declarations and definations in a WholeStageCodeGened stage Closes apache#15240 - [SPARK-17556] [CORE] [SQL] Executor side broadcast for broadcast joins Closes apache#15405 - [SPARK-15917][CORE] Added support for number of executors in Standalone [WIP] Closes apache#16099 - [SPARK-18665][SQL] set statement state to "ERROR" after user cancel job Closes apache#16445 - [SPARK-19043][SQL]Make SparkSQLSessionManager more configurable Closes apache#16618 - [SPARK-14409][ML][WIP] Add RankingEvaluator Closes apache#16766 - [SPARK-19426][SQL] Custom coalesce for Dataset Closes apache#16832 - [SPARK-19490][SQL] ignore case sensitivity when filtering hive partition columns Closes apache#17052 - [SPARK-19690][SS] Join a streaming DataFrame with a batch DataFrame which has an aggregation may not work Closes apache#17267 - [SPARK-19926][PYSPARK] Make pyspark exception more user-friendly Closes apache#17371 - [SPARK-19903][PYSPARK][SS] window operator miss the `watermark` metadata of time column Closes apache#17401 - [SPARK-18364][YARN] Expose metrics for YarnShuffleService Closes apache#17519 - [SPARK-15352][Doc] follow-up: add configuration docs for topology-aware block replication Closes apache#17530 - [SPARK-5158] Access kerberized HDFS from Spark standalone Closes apache#17854 - [SPARK-20564][Deploy] Reduce massive executor failures when executor count is large (>2000) Closes apache#17979 - [SPARK-19320][MESOS][WIP]allow specifying a hard limit on number of gpus required in each spark executor when running on mesos Closes apache#18127 - [SPARK-6628][SQL][Branch-2.1] Fix ClassCastException when executing sql statement 'insert into' on hbase table Closes apache#18236 - [SPARK-21015] Check field name is not null and empty in GenericRowWit… Closes apache#18269 - [SPARK-21056][SQL] Use at most one spark job to list files in InMemoryFileIndex Closes apache#18328 - [SPARK-21121][SQL] Support changing storage level via the spark.sql.inMemoryColumnarStorage.level variable Closes apache#18354 - [SPARK-18016][SQL][CATALYST][BRANCH-2.1] Code Generation: Constant Pool Limit - Class Splitting Closes apache#18383 - [SPARK-21167][SS] Set kafka clientId while fetch messages Closes apache#18414 - [SPARK-21169] [core] Make sure to update application status to RUNNING if executors are accepted and RUNNING after recovery Closes apache#18432 - resolve com.esotericsoftware.kryo.KryoException Closes apache#18490 - [SPARK-21269][Core][WIP] Fix FetchFailedException when enable maxReqSizeShuffleToMem and KryoSerializer Closes apache#18585 - SPARK-21359 Closes apache#18609 - Spark SQL merge small files to big files Update InsertIntoHiveTable.scala Added: Closes apache#18308 - [SPARK-21099][Spark Core] INFO Log Message Using Incorrect Executor I… Closes apache#18599 - [SPARK-21372] spark writes one log file even I set the number of spark_rotate_log to 0 Closes apache#18619 - [SPARK-21397][BUILD]Maven shade plugin adding dependency-reduced-pom.xml to … Closes apache#18667 - Fix the simpleString used in error messages Closes apache#18782 - Branch 2.1 Added: Closes apache#17694 - [SPARK-12717][PYSPARK] Resolving race condition with pyspark broadcasts when using multiple threads Added: Closes apache#16456 - [SPARK-18994] clean up the local directories for application in future by annother thread Closes apache#18683 - [SPARK-21474][CORE] Make number of parallel fetches from a reducer configurable Closes apache#18690 - [SPARK-21334][CORE] Add metrics reporting service to External Shuffle Server Added: Closes apache#18827 - Merge pull request 1 from apache/master ## How was this patch tested? N/A Author: hyukjinkwon <gurwls223@gmail.com> Closes apache#18780 from HyukjinKwon/close-prs.


What changes were proposed in this pull request?
This PR changes
InMemoryFileIndex.listLeafFilesbehaviour to launch at most one spark job to list leaf files.##JIRA
https://issues.apache.org/jira/browse/SPARK-21056
Given partitioned file relation (e.g. parquet):
root/a=../b=../c=..InMemoryFileIndex.listLeafFiles runs numberOfPartitions(a) times numberOfPartitions(b) spark jobs sequentially to list leaf files, if both numberOfPartitions(a) and numberOfPartitions(b) are below
spark.sql.sources.parallelPartitionDiscovery.thresholdand numberOfPartitions(c) is abovespark.sql.sources.parallelPartitionDiscovery.thresholdSince the jobs are run sequentially, the overhead of the jobs dominates and the file listing operation can become significantly slower than listing the files from the driver.
I propose that InMemoryFileIndex.listLeafFiles should launch at most one spark job for listing leaf files.
How was this patch tested?
Adapted existing tests to match expected behaviour.