Skip to content

[SPARK-17661][SQL] Consolidate various listLeafFiles implementations #15235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

petermaxlee
Copy link
Contributor

@petermaxlee petermaxlee commented Sep 25, 2016

What changes were proposed in this pull request?

There are 4 listLeafFiles-related functions in Spark:

  • ListingFileCatalog.listLeafFiles (which calls HadoopFsRelation.listLeafFilesInParallel if the number of paths passed in is greater than a threshold; if it is lower, then it has its own serial version implemented)
  • HadoopFsRelation.listLeafFiles (called only by HadoopFsRelation.listLeafFilesInParallel)
  • HadoopFsRelation.listLeafFilesInParallel (called only by ListingFileCatalog.listLeafFiles)

It is actually very confusing and error prone because there are effectively two distinct implementations for the serial version of listing leaf files. As an example, SPARK-17599 updated only one of the code path and ignored the other one.

This code can be improved by:

  • Move all file listing code into ListingFileCatalog, since it is the only class that needs this.
  • Keep only one function for listing files in serial.

How was this patch tested?

This change should be covered by existing unit and integration tests. I also moved a test case for HadoopFsRelation.shouldFilterOut from HadoopFsRelationSuite to ListingFileCatalogSuite.

@petermaxlee
Copy link
Contributor Author

@yhuai I think you wrote most of this. Can you take a look?

// well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`.
// Here we use `SerializableFileStatus` to extract key components of a `FileStatus` to serialize
// it from executor side and reconstruct it on driver side.
private case class SerializableBlockLocation(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this from "Fake" to "Serializable" to more accurately describe its purpose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

.parallelize(serializedPaths, numParallelism)
.mapPartitions { paths =>
val hadoopConf = serializableConfiguration.value
listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is very similar to the old listLeafFilesInParallel, except I replaced the code within this mapPartitions call with listLeafFilesInSerial

/**
* List a single path, provided as a FileStatus, in serial.
*/
private def listLeafFiles0(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is almost the same as the old HadoopFsRelation.listLeafFiles. The old code was:

  def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = {
    logTrace(s"Listing ${status.getPath}")
    val name = status.getPath.getName.toLowerCase
    if (shouldFilterOut(name)) {
      Array.empty[FileStatus]
    } else {
      val statuses = {
        val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
        val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))
        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
      }
      // statuses do not have any dirs.
      statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
        case f: LocatedFileStatus => f

        // NOTE:
        //
        // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
        //   operations, calling `getFileBlockLocations` does no harm here since these file system
        //   implementations don't actually issue RPC for this method.
        //
        // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
        //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
        //   paths exceeds threshold.
        case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
      }
    }
  }

  def createLocatedFileStatus(f: FileStatus, locations: Array[BlockLocation]): LocatedFileStatus = {
    // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), which is
    // very slow on some file system (RawLocalFileSystem, which is launch a subprocess and parse the
    // stdout).
    val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
      f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
    if (f.isSymlink) {
      lfs.setSymlink(f.getSymlink)
    }
    lfs
  }

}

override def hashCode(): Int = paths.toSet.hashCode()
/** Checks if we should filter out this path name. */
def shouldFilterOut(pathName: String): Boolean = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is identical to the old code in HadoopFsRelation.shouldFilterOut


class ListingFileCatalogSuite extends SparkFunSuite {

test("file filtering") {
Copy link
Contributor Author

@petermaxlee petermaxlee Sep 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was moved from HadoopFsRelationSuite without any change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may add this test to FileCatalogSuite if you like

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's counterintuitive. The code is defined in ListingFileCatalog, and should be tested in ListingFileCatalogSuite.

@SparkQA
Copy link

SparkQA commented Sep 25, 2016

Test build #65876 has finished for PR 15235 at commit 2a76ec1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@petermaxlee
Copy link
Contributor Author

@brkyvz I think this also impacts the change you just did in #15153. This change makes both code path consistent.

@SparkQA
Copy link

SparkQA commented Sep 25, 2016

Test build #65888 has finished for PR 15235 at commit 5c6a640.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main comment is that the try catch block for SPARK-17599 is in the wrong place.


object ListingFileCatalog extends Logging {

// `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing, but good to fix, the comment belongs on the class below not SerializableBlockLocation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved

val fs = path.getFileSystem(hadoopConf)

// [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
val status: Option[FileStatus] = try Option(fs.getFileStatus(path)) catch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this. You are increasing the number of getStatus calls that we need to make. There is no guarantee that the folder will exist once listLeafFiles0 is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me take a look at this. This is actually consistent with the old code (for the parallel version). It is actually slightly tricky to remove this.

.mapPartitions { paths =>
val hadoopConf = serializableConfiguration.value
listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
}.map { status =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't you just call map on the iterator but call it on the rdd?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was pre-existing code.

.parallelize(serializedPaths, numParallelism)
.mapPartitions { paths =>
val hadoopConf = serializableConfiguration.value
listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn't just call listLeafFilesInSerial here. It's more likely that one level down, you're going to have a bunch more directories that you may want to list, where you want more parallelization. You should iteratively list sub directories in parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was following the old behavior.

} else {
val statuses = {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir, filter))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the directories should be submittable as a parallel job if we were told that we should parallelize file listing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the old code path was just like this, maybe it's not necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I don't see why we want to change all of these in this pull request, unless they are a problem.

Seq.empty[FileStatus]
} else {
val statuses = {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually where you need the try-catch block to see if the file exists or not.


class ListingFileCatalogSuite extends SparkFunSuite {

test("file filtering") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may add this test to FileCatalogSuite if you like

// well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`.
// Here we use `SerializableFileStatus` to extract key components of a `FileStatus` to serialize
// it from executor side and reconstruct it on driver side.
private case class SerializableBlockLocation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@petermaxlee
Copy link
Contributor Author

I pushed a new version that should address all the outstanding issues.

@SparkQA
Copy link

SparkQA commented Sep 27, 2016

Test build #65936 has finished for PR 15235 at commit 3c99c3c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@petermaxlee
Copy link
Contributor Author

@brkyvz does this look good?

@brkyvz
Copy link
Contributor

brkyvz commented Oct 13, 2016

@petermaxlee Thanks for making the change. This LGTM now that the fix for SPARK-17599 is in the right place. The rest is just moving around, consolidating old code.

@rxin
Copy link
Contributor

rxin commented Oct 13, 2016

Merging in master. Thanks!

@asfgit asfgit closed this in adc1124 Oct 13, 2016
robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
## What changes were proposed in this pull request?
There are 4 listLeafFiles-related functions in Spark:

- ListingFileCatalog.listLeafFiles (which calls HadoopFsRelation.listLeafFilesInParallel if the number of paths passed in is greater than a threshold; if it is lower, then it has its own serial version implemented)
- HadoopFsRelation.listLeafFiles (called only by HadoopFsRelation.listLeafFilesInParallel)
- HadoopFsRelation.listLeafFilesInParallel (called only by ListingFileCatalog.listLeafFiles)

It is actually very confusing and error prone because there are effectively two distinct implementations for the serial version of listing leaf files. As an example, SPARK-17599 updated only one of the code path and ignored the other one.

This code can be improved by:

- Move all file listing code into ListingFileCatalog, since it is the only class that needs this.
- Keep only one function for listing files in serial.

## How was this patch tested?
This change should be covered by existing unit and integration tests. I also moved a test case for HadoopFsRelation.shouldFilterOut from HadoopFsRelationSuite to ListingFileCatalogSuite.

Author: petermaxlee <petermaxlee@gmail.com>

Closes apache#15235 from petermaxlee/SPARK-17661.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?
There are 4 listLeafFiles-related functions in Spark:

- ListingFileCatalog.listLeafFiles (which calls HadoopFsRelation.listLeafFilesInParallel if the number of paths passed in is greater than a threshold; if it is lower, then it has its own serial version implemented)
- HadoopFsRelation.listLeafFiles (called only by HadoopFsRelation.listLeafFilesInParallel)
- HadoopFsRelation.listLeafFilesInParallel (called only by ListingFileCatalog.listLeafFiles)

It is actually very confusing and error prone because there are effectively two distinct implementations for the serial version of listing leaf files. As an example, SPARK-17599 updated only one of the code path and ignored the other one.

This code can be improved by:

- Move all file listing code into ListingFileCatalog, since it is the only class that needs this.
- Keep only one function for listing files in serial.

## How was this patch tested?
This change should be covered by existing unit and integration tests. I also moved a test case for HadoopFsRelation.shouldFilterOut from HadoopFsRelationSuite to ListingFileCatalogSuite.

Author: petermaxlee <petermaxlee@gmail.com>

Closes apache#15235 from petermaxlee/SPARK-17661.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants