Skip to content

Commit

Permalink
[SPARK-32381][CORE][SQL][FOLLOWUP] More cleanup on HadoopFSUtils
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR is a follow-up of #29471 and does the following improvements for `HadoopFSUtils`:
1. Removes the extra `filterFun` from the listing API and combines it with the `filter`.
2. Removes `SerializableBlockLocation` and `SerializableFileStatus` given that `BlockLocation` and `FileStatus` are already serializable.
3. Hides the `isRootLevel` flag from the top-level API.

### Why are the changes needed?

Main purpose is to simplify the logic within `HadoopFSUtils` as well as cleanup the API.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing unit tests (e.g., `FileIndexSuite`)

Closes #29959 from sunchao/hadoop-fs-utils-followup.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
  • Loading branch information
sunchao authored and holdenk committed Nov 18, 2020
1 parent dcac78e commit 27cd945
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 94 deletions.
104 changes: 19 additions & 85 deletions core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.viewfs.ViewFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem

import org.apache.spark._
import org.apache.spark.annotation.Private
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics

Expand All @@ -45,8 +44,6 @@ private[spark] object HadoopFSUtils extends Logging {
* @param paths Input paths to list
* @param hadoopConf Hadoop configuration
* @param filter Path filter used to exclude leaf files from result
* @param isRootLevel Whether the input paths are at the root level, i.e., they are the root
* paths as opposed to nested paths encountered during recursive calls of this.
* @param ignoreMissingFiles Ignore missing files that happen during recursive listing
* (e.g., due to race conditions)
* @param ignoreLocality Whether to fetch data locality info when listing leaf files. If false,
Expand All @@ -57,11 +54,22 @@ private[spark] object HadoopFSUtils extends Logging {
* @param parallelismMax The maximum parallelism for listing. If the number of input paths is
* larger than this value, parallelism will be throttled to this value
* to avoid generating too many tasks.
* @param filterFun Optional predicate on the leaf files. Files who failed the check will be
* excluded from the results
* @return for each input path, the set of discovered files for the path
*/
def parallelListLeafFiles(
sc: SparkContext,
paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
parallelismThreshold: Int,
parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, isRootLevel = true,
ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax)
}

private def parallelListLeafFilesInternal(
sc: SparkContext,
paths: Seq[Path],
hadoopConf: Configuration,
Expand All @@ -70,8 +78,7 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
parallelismThreshold: Int,
parallelismMax: Int,
filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = {
parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {

// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= parallelismThreshold) {
Expand All @@ -85,8 +92,7 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreLocality = ignoreLocality,
isRootPath = isRootLevel,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismMax,
filterFun = filterFun)
parallelismMax = parallelismMax)
(path, leafFiles)
}
}
Expand Down Expand Up @@ -126,58 +132,16 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isRootPath = isRootLevel,
filterFun = filterFun,
parallelismThreshold = Int.MaxValue,
parallelismMax = 0)
(path, leafFiles)
}.iterator
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
val blockLocations = status match {
case f: LocatedFileStatus =>
f.getBlockLocations.map { loc =>
SerializableBlockLocation(
loc.getNames,
loc.getHosts,
loc.getOffset,
loc.getLength)
}

case _ =>
Array.empty[SerializableBlockLocation]
}

SerializableFileStatus(
status.getPath.toString,
status.getLen,
status.isDirectory,
status.getReplication,
status.getBlockSize,
status.getModificationTime,
status.getAccessTime,
blockLocations)
}
(path.toString, serializableStatuses)
}.collect()
} finally {
sc.setJobDescription(previousJobDescription)
}

// turn SerializableFileStatus back to Status
statusMap.map { case (path, serializableStatuses) =>
val statuses = serializableStatuses.map { f =>
val blockLocations = f.blockLocations.map { loc =>
new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
}
new LocatedFileStatus(
new FileStatus(
f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
new Path(f.path)),
blockLocations)
}
(new Path(path), statuses)
}
statusMap.toSeq
}

// scalastyle:off argcount
Expand All @@ -197,7 +161,6 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
isRootPath: Boolean,
filterFun: Option[String => Boolean],
parallelismThreshold: Int,
parallelismMax: Int): Seq[FileStatus] = {

Expand Down Expand Up @@ -245,27 +208,18 @@ private[spark] object HadoopFSUtils extends Logging {
Array.empty[FileStatus]
}

def doFilter(statuses: Array[FileStatus]) = filterFun match {
case Some(shouldFilterOut) =>
statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
case None =>
statuses
}

val filteredStatuses = doFilter(statuses)
val allLeafStatuses = {
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
val nestedFiles: Seq[FileStatus] = contextOpt match {
case Some(context) if dirs.size > parallelismThreshold =>
parallelListLeafFiles(
parallelListLeafFilesInternal(
context,
dirs.map(_.getPath),
hadoopConf = hadoopConf,
filter = filter,
isRootLevel = false,
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
filterFun = filterFun,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismMax
).flatMap(_._2)
Expand All @@ -279,7 +233,6 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isRootPath = false,
filterFun = filterFun,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismMax)
}
Expand All @@ -289,8 +242,7 @@ private[spark] object HadoopFSUtils extends Logging {
}

val missingFiles = mutable.ArrayBuffer.empty[String]
val filteredLeafStatuses = doFilter(allLeafStatuses)
val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
val resolvedLeafStatuses = allLeafStatuses.flatMap {
case f: LocatedFileStatus =>
Some(f)

Expand Down Expand Up @@ -339,22 +291,4 @@ private[spark] object HadoopFSUtils extends Logging {
resolvedLeafStatuses
}
// scalastyle:on argcount

/** A serializable variant of HDFS's BlockLocation. */
private case class SerializableBlockLocation(
names: Array[String],
hosts: Array[String],
offset: Long,
length: Long)

/** A serializable variant of HDFS's FileStatus. */
private case class SerializableFileStatus(
path: String,
length: Long,
isDir: Boolean,
blockReplication: Short,
blockSize: Long,
modificationTime: Long,
accessTime: Long,
blockLocations: Array[SerializableBlockLocation])
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ object CommandUtils extends Logging {
.getConfString("hive.exec.stagingdir", ".hive-staging")
val filter = new PathFilterIgnoreNonData(stagingDir)
val sizes = InMemoryFileIndex.bulkListLeafFiles(paths.flatten,
sparkSession.sessionState.newHadoopConf(), filter, sparkSession, isRootLevel = true).map {
sparkSession.sessionState.newHadoopConf(), filter, sparkSession).map {
case (_, files) => files.map(_.getLen).sum
}
// the size is 0 where paths(i) is not defined and sizes(i) where it is defined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class InMemoryFileIndex(
}
val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
val discovered = InMemoryFileIndex.bulkListLeafFiles(
pathsToFetch.toSeq, hadoopConf, filter, sparkSession, isRootLevel = true)
pathsToFetch.toSeq, hadoopConf, filter, sparkSession)
discovered.foreach { case (path, leafFiles) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
Expand All @@ -146,20 +146,17 @@ object InMemoryFileIndex extends Logging {
paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
sparkSession: SparkSession,
isRootLevel: Boolean): Seq[(Path, Seq[FileStatus])] = {
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
HadoopFSUtils.parallelListLeafFiles(
sc = sparkSession.sparkContext,
paths = paths,
hadoopConf = hadoopConf,
filter = filter,
isRootLevel = isRootLevel,
filter = new PathFilterWrapper(filter),
ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles,
ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality,
parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold,
parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism,
filterFun = Some(shouldFilterOut))
}
parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
}

/** Checks if we should filter out this path name. */
def shouldFilterOut(pathName: String): Boolean = {
Expand All @@ -175,3 +172,9 @@ object InMemoryFileIndex extends Logging {
exclude && !include
}
}

private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with Serializable {
override def accept(path: Path): Boolean = {
(filter == null || filter.accept(path)) && !InMemoryFileIndex.shouldFilterOut(path.getName)
}
}

0 comments on commit 27cd945

Please sign in to comment.