-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-17569] Make StructuredStreaming FileStreamSource batch generation faster #15122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
also cc @cloud-fan |
Test build #65512 has finished for PR 15122 at commit
|
Can you test this by deleting the file on purpose, and see what kind of exceptions are thrown? |
I looked into this. I think there are two ways that you can intercept any calls to HDFS. The first way is slightly hacky but pretty simple. FileSystem.addFileSystemForTesting is a package private method that can be used to inject a mock file system. You can create an implementation of FilterFileSystem and pass it in as "file" schema. Then all accesses to local file system will go through your implementation. Of course, you can also use a mocking library to do that, but that is not as clean since FilterFileSystem is a public interface. The second way is more robust and does not depend on any private APIs. Create an implementation of FilterFileSystem by pointing to LocalFileSystem, e.g. call it MockFileSystem. MockFileSystem.getScheme should return "mockfs://". You can then use this as the path when passing to structured streaming. This is probably a more robust, generic solution. There is also the possibility of depending on the ordering of how FileSystem and the class loader loads classes -- but I wouldn't recommend that. |
@petermaxlee I believe you will get a runtime exception saying that the file does not exist. Also, regarding your options 2, are you suggesting that users of structured streaming to use such a mock fs? Or you are suggesting that structured streaming to use such a fs. Also, why LocalFS is related to this case? |
@yhuai The suggestions are for purely testing purposes, to make sure that StructuredStreaming doesn't check for file existence twice. |
@petermaxlee Thank you for the suggestions for testing. I will try out Option 1, since 2 is a bit much work for a minor PR as this. |
ok, got it. Thanks! |
Added test using Option 2 in the end. |
Test build #65604 has finished for PR 15122 at commit
|
Test build #65605 has finished for PR 15122 at commit
|
LGTM. Let's run the test again since there are several PRs about FileStreamSource got merged in the past two days. |
Test build #3285 has finished for PR 15122 at commit
|
classOf[ExistsThrowsExceptionFileSystem].getName) | ||
// add the metadata entries as a pre-req | ||
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir | ||
val metadataLog = new HDFSMetadataLog[Array[FileEntry]](spark, dir.getAbsolutePath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to use FileStreamSourceLog
here to add FileEntry as the latest master uses it to compact File source's logs.
@zsxwing Thanks for the comment. Updated! |
Test build #65736 has finished for PR 15122 at commit
|
LGTM. Merging to master and |
There are some conflicts with 2.0. Could you submit a patch for branch 2.0? |
## What changes were proposed in this pull request? A [PR](apache@a6aade0) was merged concurrently that made the unit test for PR apache#15122 not test anything anymore. This PR fixes the test. ## How was this patch tested? Changed line https://github.com/apache/spark/blob/0d634875026ccf1eaf984996e9460d7673561f80/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L137 from `false` to `true` and made sure the unit test failed. Author: Burak Yavuz <brkyvz@gmail.com> Closes apache#15203 from brkyvz/fix-test.
## What changes were proposed in this pull request? This Backports PR #15153 and PR #15122 to Spark 2.0 branch for Structured Streaming. It is structured a bit differently because similar code paths already existed in the 2.0 branch. The unit test makes sure that both behaviors don't break. Author: Burak Yavuz <brkyvz@gmail.com> Closes #15202 from brkyvz/backports-to-streaming.
What changes were proposed in this pull request?
While getting the batch for a
FileStreamSource
in StructuredStreaming, we know which files we must take specifically. We already have verified that they exist, and have committed them to a metadata log. When creating the FileSourceRelation however for an incremental execution, the code checks the existence of every single file once again!When you have 100,000s of files in a folder, creating the first batch takes 2 hours+ when working with S3! This PR disables that check
How was this patch tested?
Added a unit test to
FileStreamSource
.