Skip to content

[SPARK-17569] Make StructuredStreaming FileStreamSource batch generation faster #15122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

brkyvz
Copy link
Contributor

@brkyvz brkyvz commented Sep 16, 2016

What changes were proposed in this pull request?

While getting the batch for a FileStreamSource in StructuredStreaming, we know which files we must take specifically. We already have verified that they exist, and have committed them to a metadata log. When creating the FileSourceRelation however for an incremental execution, the code checks the existence of every single file once again!

When you have 100,000s of files in a folder, creating the first batch takes 2 hours+ when working with S3! This PR disables that check

How was this patch tested?

Added a unit test to FileStreamSource.

@brkyvz
Copy link
Contributor Author

brkyvz commented Sep 16, 2016

cc @zsxwing @yhuai

@yhuai
Copy link
Contributor

yhuai commented Sep 16, 2016

also cc @cloud-fan

@SparkQA
Copy link

SparkQA commented Sep 17, 2016

Test build #65512 has finished for PR 15122 at commit 9b7e2de.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@petermaxlee
Copy link
Contributor

Can you test this by deleting the file on purpose, and see what kind of exceptions are thrown?

@petermaxlee
Copy link
Contributor

petermaxlee commented Sep 18, 2016

I looked into this. I think there are two ways that you can intercept any calls to HDFS.

The first way is slightly hacky but pretty simple. FileSystem.addFileSystemForTesting is a package private method that can be used to inject a mock file system. You can create an implementation of FilterFileSystem and pass it in as "file" schema. Then all accesses to local file system will go through your implementation. Of course, you can also use a mocking library to do that, but that is not as clean since FilterFileSystem is a public interface.

The second way is more robust and does not depend on any private APIs. Create an implementation of FilterFileSystem by pointing to LocalFileSystem, e.g. call it MockFileSystem. MockFileSystem.getScheme should return "mockfs://". You can then use this as the path when passing to structured streaming. This is probably a more robust, generic solution.

There is also the possibility of depending on the ordering of how FileSystem and the class loader loads classes -- but I wouldn't recommend that.

@yhuai
Copy link
Contributor

yhuai commented Sep 19, 2016

@petermaxlee I believe you will get a runtime exception saying that the file does not exist.

Also, regarding your options 2, are you suggesting that users of structured streaming to use such a mock fs? Or you are suggesting that structured streaming to use such a fs. Also, why LocalFS is related to this case?

@brkyvz
Copy link
Contributor Author

brkyvz commented Sep 19, 2016

@yhuai The suggestions are for purely testing purposes, to make sure that StructuredStreaming doesn't check for file existence twice.

@brkyvz
Copy link
Contributor Author

brkyvz commented Sep 19, 2016

@petermaxlee Thank you for the suggestions for testing. I will try out Option 1, since 2 is a bit much work for a minor PR as this.

@yhuai
Copy link
Contributor

yhuai commented Sep 19, 2016

ok, got it. Thanks!

@brkyvz
Copy link
Contributor Author

brkyvz commented Sep 19, 2016

Added test using Option 2 in the end.

@SparkQA
Copy link

SparkQA commented Sep 19, 2016

Test build #65604 has finished for PR 15122 at commit 0cf9c08.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem

@SparkQA
Copy link

SparkQA commented Sep 19, 2016

Test build #65605 has finished for PR 15122 at commit 6221b37.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext

@zsxwing
Copy link
Member

zsxwing commented Sep 21, 2016

LGTM. Let's run the test again since there are several PRs about FileStreamSource got merged in the past two days.

@SparkQA
Copy link

SparkQA commented Sep 21, 2016

Test build #3285 has finished for PR 15122 at commit 6221b37.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext

classOf[ExistsThrowsExceptionFileSystem].getName)
// add the metadata entries as a pre-req
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
val metadataLog = new HDFSMetadataLog[Array[FileEntry]](spark, dir.getAbsolutePath)
Copy link
Member

@zsxwing zsxwing Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to use FileStreamSourceLog here to add FileEntry as the latest master uses it to compact File source's logs.

@brkyvz
Copy link
Contributor Author

brkyvz commented Sep 21, 2016

@zsxwing Thanks for the comment. Updated!

@SparkQA
Copy link

SparkQA commented Sep 22, 2016

Test build #65736 has finished for PR 15122 at commit 666c2c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Sep 22, 2016

LGTM. Merging to master and 2.0. Thanks!

@zsxwing
Copy link
Member

zsxwing commented Sep 22, 2016

There are some conflicts with 2.0. Could you submit a patch for branch 2.0?

@asfgit asfgit closed this in 7cbe216 Sep 22, 2016
ghost pushed a commit to dbtsai/spark that referenced this pull request Sep 22, 2016
## What changes were proposed in this pull request?

A [PR](apache@a6aade0) was merged concurrently that made the unit test for PR apache#15122 not test anything anymore. This PR fixes the test.

## How was this patch tested?

Changed line https://github.com/apache/spark/blob/0d634875026ccf1eaf984996e9460d7673561f80/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L137
from `false` to `true` and made sure the unit test failed.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes apache#15203 from brkyvz/fix-test.
asfgit pushed a commit that referenced this pull request Sep 23, 2016
## What changes were proposed in this pull request?

This Backports PR #15153 and PR #15122 to Spark 2.0 branch for Structured Streaming.
It is structured a bit differently because similar code paths already existed in the 2.0 branch. The unit test makes sure that both behaviors don't break.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15202 from brkyvz/backports-to-streaming.
@brkyvz brkyvz deleted the SPARK-17569 branch February 3, 2019 20:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants