-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-29876][SS] Delete/archive file source completed files in separate thread #26502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The reason why I've added number of threads configuration possibility is the archiving part. Namely move can be copy on couple of file systems (like S3) which is time consuming and the cleaner maybe not able to keep the speed with the streaming query. |
Test build #113697 has finished for PR 26502 at commit
|
cc @vanzin @HeartSaVioR since you know SPARK-20568 well. |
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Outdated
Show resolved
Hide resolved
@@ -550,7 +550,8 @@ Here are the details of all the sources in Spark. | |||
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/> | |||
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. <code>/archived/here</code>. This will ensure archived files are never included as new source files.<br/> | |||
Spark will move source files respecting their own path. For example, if the path of source file is <code>/a/b/dataset.txt</code> and the path of archive directory is <code>/archived/here</code>, file will be moved to <code>/archived/here/a/b/dataset.txt</code>.<br/> | |||
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/> | |||
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down, even if it's happening in separate thread) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/> | |||
Number of threads used in completed file cleaner can be configured with<code>spark.sql.streaming.fileSource.cleaner.numThreads</code>.<br/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We configure cleanSource
in FileStreamSourceOption; it should be available in same place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its implementation detail which not necessarily should appear on source option level. I would add it if per source configuration is required (but then maybe --conf
can be used).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the benefit of placing it to configuration, but still don't feel intuitive to configure this to here and that to over there. Let's hear others' voices.
Test build #114165 has finished for PR 26502 at commit
|
retest this please |
Test build #114173 has finished for PR 26502 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd still like to hear others' voices for "source option" vs "SQL configuration", but other parts look great.
"source option" vs "SparkConf" is better I think. |
@zsxwing this is an addition to the source file archival/delete which worth the attention. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't have an opinion about the configuration. Other than maybe it should be by default just 1 thread.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Outdated
Show resolved
Hide resolved
Set it to 1 + moved the |
Test build #114777 has finished for PR 26502 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Outdated
Show resolved
Hide resolved
Do the tests need any adjustment given that the cleaning is asynchronous now? (Maybe to disable the cleaning thread.) |
How about modifying Alternatively we can check the files with |
Started to catch-up which will take some time and going to continue... |
Yeah, the thread must be turned off not to have flakyness. |
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except the nit commented.
Test build #116716 has finished for PR 26502 at commit
|
Test build #116718 has finished for PR 26502 at commit
|
Test build #116776 has finished for PR 26502 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Outdated
Show resolved
Hide resolved
Test build #116853 has finished for PR 26502 at commit
|
Merging to master. |
What changes were proposed in this pull request?
SPARK-20568 added the possibility to clean up completed files in streaming query. Deleting/archiving uses the main thread which can slow down processing. In this PR I've created thread pool to handle file delete/archival. The number of threads can be configured with
spark.sql.streaming.fileSource.cleaner.numThreads
.Why are the changes needed?
Do file delete/archival in separate thread.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing unit tests.