-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-17661][SQL] Consolidate various listLeafFiles implementations #15235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@yhuai I think you wrote most of this. Can you take a look? |
// well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`. | ||
// Here we use `SerializableFileStatus` to extract key components of a `FileStatus` to serialize | ||
// it from executor side and reconstruct it on driver side. | ||
private case class SerializableBlockLocation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed this from "Fake" to "Serializable" to more accurately describe its purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
.parallelize(serializedPaths, numParallelism) | ||
.mapPartitions { paths => | ||
val hadoopConf = serializableConfiguration.value | ||
listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is very similar to the old listLeafFilesInParallel, except I replaced the code within this mapPartitions call with listLeafFilesInSerial
/** | ||
* List a single path, provided as a FileStatus, in serial. | ||
*/ | ||
private def listLeafFiles0( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is almost the same as the old HadoopFsRelation.listLeafFiles. The old code was:
def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = {
logTrace(s"Listing ${status.getPath}")
val name = status.getPath.getName.toLowerCase
if (shouldFilterOut(name)) {
Array.empty[FileStatus]
} else {
val statuses = {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))
if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
}
// statuses do not have any dirs.
statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
case f: LocatedFileStatus => f
// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
// be a big deal since we always use to `listLeafFilesInParallel` when the number of
// paths exceeds threshold.
case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
}
}
}
def createLocatedFileStatus(f: FileStatus, locations: Array[BlockLocation]): LocatedFileStatus = {
// The other constructor of LocatedFileStatus will call FileStatus.getPermission(), which is
// very slow on some file system (RawLocalFileSystem, which is launch a subprocess and parse the
// stdout).
val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
if (f.isSymlink) {
lfs.setSymlink(f.getSymlink)
}
lfs
}
} | ||
|
||
override def hashCode(): Int = paths.toSet.hashCode() | ||
/** Checks if we should filter out this path name. */ | ||
def shouldFilterOut(pathName: String): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is identical to the old code in HadoopFsRelation.shouldFilterOut
|
||
class ListingFileCatalogSuite extends SparkFunSuite { | ||
|
||
test("file filtering") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was moved from HadoopFsRelationSuite without any change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you may add this test to FileCatalogSuite
if you like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's counterintuitive. The code is defined in ListingFileCatalog, and should be tested in ListingFileCatalogSuite.
Test build #65876 has finished for PR 15235 at commit
|
Test build #65888 has finished for PR 15235 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main comment is that the try catch
block for SPARK-17599 is in the wrong place.
|
||
object ListingFileCatalog extends Logging { | ||
|
||
// `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
existing, but good to fix, the comment belongs on the class below not SerializableBlockLocation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved
val fs = path.getFileSystem(hadoopConf) | ||
|
||
// [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist | ||
val status: Option[FileStatus] = try Option(fs.getFileStatus(path)) catch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need this. You are increasing the number of getStatus
calls that we need to make. There is no guarantee that the folder will exist once listLeafFiles0
is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me take a look at this. This is actually consistent with the old code (for the parallel version). It is actually slightly tricky to remove this.
.mapPartitions { paths => | ||
val hadoopConf = serializableConfiguration.value | ||
listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator | ||
}.map { status => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't you just call map on the iterator but call it on the rdd?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was pre-existing code.
.parallelize(serializedPaths, numParallelism) | ||
.mapPartitions { paths => | ||
val hadoopConf = serializableConfiguration.value | ||
listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you shouldn't just call listLeafFilesInSerial
here. It's more likely that one level down, you're going to have a bunch more directories that you may want to list, where you want more parallelization. You should iteratively list sub directories in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was following the old behavior.
} else { | ||
val statuses = { | ||
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory) | ||
val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir, filter)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the directories should be submittable as a parallel job if we were told that we should parallelize file listing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the old code path was just like this, maybe it's not necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I don't see why we want to change all of these in this pull request, unless they are a problem.
Seq.empty[FileStatus] | ||
} else { | ||
val statuses = { | ||
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is actually where you need the try-catch
block to see if the file exists or not.
|
||
class ListingFileCatalogSuite extends SparkFunSuite { | ||
|
||
test("file filtering") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you may add this test to FileCatalogSuite
if you like
// well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`. | ||
// Here we use `SerializableFileStatus` to extract key components of a `FileStatus` to serialize | ||
// it from executor side and reconstruct it on driver side. | ||
private case class SerializableBlockLocation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
I pushed a new version that should address all the outstanding issues. |
Test build #65936 has finished for PR 15235 at commit
|
@brkyvz does this look good? |
@petermaxlee Thanks for making the change. This LGTM now that the fix for |
Merging in master. Thanks! |
## What changes were proposed in this pull request? There are 4 listLeafFiles-related functions in Spark: - ListingFileCatalog.listLeafFiles (which calls HadoopFsRelation.listLeafFilesInParallel if the number of paths passed in is greater than a threshold; if it is lower, then it has its own serial version implemented) - HadoopFsRelation.listLeafFiles (called only by HadoopFsRelation.listLeafFilesInParallel) - HadoopFsRelation.listLeafFilesInParallel (called only by ListingFileCatalog.listLeafFiles) It is actually very confusing and error prone because there are effectively two distinct implementations for the serial version of listing leaf files. As an example, SPARK-17599 updated only one of the code path and ignored the other one. This code can be improved by: - Move all file listing code into ListingFileCatalog, since it is the only class that needs this. - Keep only one function for listing files in serial. ## How was this patch tested? This change should be covered by existing unit and integration tests. I also moved a test case for HadoopFsRelation.shouldFilterOut from HadoopFsRelationSuite to ListingFileCatalogSuite. Author: petermaxlee <petermaxlee@gmail.com> Closes apache#15235 from petermaxlee/SPARK-17661.
## What changes were proposed in this pull request? There are 4 listLeafFiles-related functions in Spark: - ListingFileCatalog.listLeafFiles (which calls HadoopFsRelation.listLeafFilesInParallel if the number of paths passed in is greater than a threshold; if it is lower, then it has its own serial version implemented) - HadoopFsRelation.listLeafFiles (called only by HadoopFsRelation.listLeafFilesInParallel) - HadoopFsRelation.listLeafFilesInParallel (called only by ListingFileCatalog.listLeafFiles) It is actually very confusing and error prone because there are effectively two distinct implementations for the serial version of listing leaf files. As an example, SPARK-17599 updated only one of the code path and ignored the other one. This code can be improved by: - Move all file listing code into ListingFileCatalog, since it is the only class that needs this. - Keep only one function for listing files in serial. ## How was this patch tested? This change should be covered by existing unit and integration tests. I also moved a test case for HadoopFsRelation.shouldFilterOut from HadoopFsRelationSuite to ListingFileCatalogSuite. Author: petermaxlee <petermaxlee@gmail.com> Closes apache#15235 from petermaxlee/SPARK-17661.
What changes were proposed in this pull request?
There are 4 listLeafFiles-related functions in Spark:
It is actually very confusing and error prone because there are effectively two distinct implementations for the serial version of listing leaf files. As an example, SPARK-17599 updated only one of the code path and ignored the other one.
This code can be improved by:
How was this patch tested?
This change should be covered by existing unit and integration tests. I also moved a test case for HadoopFsRelation.shouldFilterOut from HadoopFsRelationSuite to ListingFileCatalogSuite.