Skip to content

[SPARK-29876][SS] Delete/archive file source completed files in separate thread #26502

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from

Conversation

gaborgsomogyi
Copy link
Contributor

What changes were proposed in this pull request?

SPARK-20568 added the possibility to clean up completed files in streaming query. Deleting/archiving uses the main thread which can slow down processing. In this PR I've created thread pool to handle file delete/archival. The number of threads can be configured with spark.sql.streaming.fileSource.cleaner.numThreads.

Why are the changes needed?

Do file delete/archival in separate thread.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing unit tests.

@gaborgsomogyi
Copy link
Contributor Author

The reason why I've added number of threads configuration possibility is the archiving part. Namely move can be copy on couple of file systems (like S3) which is time consuming and the cleaner maybe not able to keep the speed with the streaming query.

@SparkQA
Copy link

SparkQA commented Nov 13, 2019

Test build #113697 has finished for PR 26502 at commit 7f14b55.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

gaborgsomogyi commented Nov 13, 2019

cc @vanzin @HeartSaVioR since you know SPARK-20568 well.

@@ -550,7 +550,8 @@ Here are the details of all the sources in Spark.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/>
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. <code>/archived/here</code>. This will ensure archived files are never included as new source files.<br/>
Spark will move source files respecting their own path. For example, if the path of source file is <code>/a/b/dataset.txt</code> and the path of archive directory is <code>/archived/here</code>, file will be moved to <code>/archived/here/a/b/dataset.txt</code>.<br/>
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down, even if it's happening in separate thread) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
Number of threads used in completed file cleaner can be configured with<code>spark.sql.streaming.fileSource.cleaner.numThreads</code>.<br/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We configure cleanSource in FileStreamSourceOption; it should be available in same place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its implementation detail which not necessarily should appear on source option level. I would add it if per source configuration is required (but then maybe --conf can be used).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the benefit of placing it to configuration, but still don't feel intuitive to configure this to here and that to over there. Let's hear others' voices.

@SparkQA
Copy link

SparkQA commented Nov 20, 2019

Test build #114165 has finished for PR 26502 at commit b0b9714.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 20, 2019

Test build #114173 has finished for PR 26502 at commit b0b9714.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd still like to hear others' voices for "source option" vs "SQL configuration", but other parts look great.

@gaborgsomogyi
Copy link
Contributor Author

"source option" vs "SparkConf" is better I think.

@gaborgsomogyi
Copy link
Contributor Author

@zsxwing this is an addition to the source file archival/delete which worth the attention.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't have an opinion about the configuration. Other than maybe it should be by default just 1 thread.

@gaborgsomogyi
Copy link
Contributor Author

Other than maybe it should be by default just 1 thread.

Set it to 1 + moved the clean functionality into the base class.

@SparkQA
Copy link

SparkQA commented Dec 3, 2019

Test build #114777 has finished for PR 26502 at commit 19b9c92.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Dec 4, 2019

Do the tests need any adjustment given that the cleaning is asynchronous now? (Maybe to disable the cleaning thread.)

@HeartSaVioR
Copy link
Contributor

Do the tests need any adjustment given that the cleaning is asynchronous now? (Maybe to disable the cleaning thread.)

How about modifying clean to call cleanTask synchronously if it's being called from UT? (Would checking IS_TESTING in SparkConf work like what we do with FsHistoryProvider?) Given we have some UTs which don't directly call cleaner.clean, we still have to run cleanup in UTs.

Alternatively we can check the files with eventually but it would add some latencies on verification.

@gaborgsomogyi
Copy link
Contributor Author

Started to catch-up which will take some time and going to continue...

@gaborgsomogyi
Copy link
Contributor Author

Yeah, the thread must be turned off not to have flakyness. IS_TESTING is something what I would like to avoid unless there is no other way. eventually has a drawback of additional latency as you've mentioned @HeartSaVioR . I think the easiest and simplest is to set the config to 0. I'm going to add this change in the last commit.

* Add doc default
* Add protected
* Turn off threads in tests
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except the nit commented.

@SparkQA
Copy link

SparkQA commented Jan 14, 2020

Test build #116716 has finished for PR 26502 at commit 74431de.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2020

Test build #116718 has finished for PR 26502 at commit b6af107.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 15, 2020

Test build #116776 has finished for PR 26502 at commit e3cb6e0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 16, 2020

Test build #116853 has finished for PR 26502 at commit 3726b08.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Jan 17, 2020

Merging to master.

@vanzin vanzin closed this in abf759a Jan 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants