Skip to content

Commit abf759a

Browse files
gaborgsomogyiMarcelo Vanzin
authored and
Marcelo Vanzin
committed
[SPARK-29876][SS] Delete/archive file source completed files in separate thread
### What changes were proposed in this pull request? [SPARK-20568](https://issues.apache.org/jira/browse/SPARK-20568) added the possibility to clean up completed files in streaming query. Deleting/archiving uses the main thread which can slow down processing. In this PR I've created thread pool to handle file delete/archival. The number of threads can be configured with `spark.sql.streaming.fileSource.cleaner.numThreads`. ### Why are the changes needed? Do file delete/archival in separate thread. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing unit tests. Closes #26502 from gaborgsomogyi/SPARK-29876. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 830e635 commit abf759a

File tree

4 files changed

+50
-10
lines changed

4 files changed

+50
-10
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,9 +551,10 @@ Here are the details of all the sources in Spark.
551551
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.<br/>
552552
For example, suppose you provide '/hello?/spark/*' as source pattern, '/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. '/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as '/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be OK as it doesn't match.<br/>
553553
Spark will move source files respecting their own path. For example, if the path of source file is <code>/a/b/dataset.txt</code> and the path of archive directory is <code>/archived/here</code>, file will be moved to <code>/archived/here/a/b/dataset.txt</code>.<br/>
554-
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
554+
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down, even if it's happening in separate thread) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
555+
Number of threads used in completed file cleaner can be configured with<code>spark.sql.streaming.fileSource.cleaner.numThreads</code> (default: 1).<br/>
555556
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.<br/>
556-
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.
557+
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query. Spark may not clean up some source files in some circumstances - e.g. the application doesn't shut down gracefully, too many files are queued to clean up.
557558
<br/><br/>
558559
For file-format-specific options, see the related methods in <code>DataStreamReader</code>
559560
(<a href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>/<a

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1301,6 +1301,12 @@ object SQLConf {
13011301
.booleanConf
13021302
.createWithDefault(true)
13031303

1304+
val FILE_SOURCE_CLEANER_NUM_THREADS =
1305+
buildConf("spark.sql.streaming.fileSource.cleaner.numThreads")
1306+
.doc("Number of threads used in the file source completed file cleaner.")
1307+
.intConf
1308+
.createWithDefault(1)
1309+
13041310
val STREAMING_SCHEMA_INFERENCE =
13051311
buildConf("spark.sql.streaming.schemaInference")
13061312
.internal()

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.execution.streaming
1919

2020
import java.net.URI
21+
import java.util.concurrent.ThreadPoolExecutor
2122
import java.util.concurrent.TimeUnit._
2223

2324
import scala.util.control.NonFatal
@@ -30,7 +31,9 @@ import org.apache.spark.internal.Logging
3031
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
3132
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3233
import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
34+
import org.apache.spark.sql.internal.SQLConf
3335
import org.apache.spark.sql.types.StructType
36+
import org.apache.spark.util.ThreadUtils
3437

3538
/**
3639
* A very simple source that reads files from the given directory as they appear.
@@ -285,7 +288,7 @@ class FileStreamSource(
285288
}
286289
}
287290

288-
override def stop(): Unit = {}
291+
override def stop(): Unit = sourceCleaner.foreach(_.stop())
289292
}
290293

291294

@@ -353,8 +356,35 @@ object FileStreamSource {
353356
def size: Int = map.size()
354357
}
355358

356-
private[sql] trait FileStreamSourceCleaner {
357-
def clean(entry: FileEntry): Unit
359+
private[sql] abstract class FileStreamSourceCleaner extends Logging {
360+
private val cleanThreadPool: Option[ThreadPoolExecutor] = {
361+
val numThreads = SQLConf.get.getConf(SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS)
362+
if (numThreads > 0) {
363+
logDebug(s"Cleaning file source on $numThreads separate thread(s)")
364+
Some(ThreadUtils.newDaemonCachedThreadPool("file-source-cleaner-threadpool", numThreads))
365+
} else {
366+
logDebug("Cleaning file source on main thread")
367+
None
368+
}
369+
}
370+
371+
def stop(): Unit = cleanThreadPool.foreach(ThreadUtils.shutdown(_))
372+
373+
def clean(entry: FileEntry): Unit = {
374+
cleanThreadPool match {
375+
case Some(p) =>
376+
p.submit(new Runnable {
377+
override def run(): Unit = {
378+
cleanTask(entry)
379+
}
380+
})
381+
382+
case None =>
383+
cleanTask(entry)
384+
}
385+
}
386+
387+
protected def cleanTask(entry: FileEntry): Unit
358388
}
359389

360390
private[sql] object FileStreamSourceCleaner {
@@ -448,7 +478,7 @@ object FileStreamSource {
448478
filters.toList
449479
}
450480

451-
override def clean(entry: FileEntry): Unit = {
481+
override protected def cleanTask(entry: FileEntry): Unit = {
452482
val curPath = new Path(new URI(entry.path))
453483
val newPath = new Path(baseArchivePath.toString.stripSuffix("/") + curPath.toUri.getPath)
454484

@@ -472,7 +502,7 @@ object FileStreamSource {
472502
private[sql] class SourceFileRemover(fileSystem: FileSystem)
473503
extends FileStreamSourceCleaner with Logging {
474504

475-
override def clean(entry: FileEntry): Unit = {
505+
override protected def cleanTask(entry: FileEntry): Unit = {
476506
val curPath = new Path(new URI(entry.path))
477507
try {
478508
logDebug(s"Removing completed file $curPath")

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1636,7 +1636,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
16361636
withSQLConf(
16371637
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
16381638
// Force deleting the old logs
1639-
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
1639+
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1",
1640+
SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS.key -> "0"
16401641
) {
16411642
val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
16421643
"cleanSource" -> "delete")
@@ -1680,7 +1681,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
16801681
withSQLConf(
16811682
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
16821683
// Force deleting the old logs
1683-
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
1684+
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1",
1685+
SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS.key -> "0"
16841686
) {
16851687
val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
16861688
"cleanSource" -> "archive", "sourceArchiveDir" -> archiveDir.getAbsolutePath)
@@ -1749,7 +1751,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
17491751
withSQLConf(
17501752
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
17511753
// Force deleting the old logs
1752-
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
1754+
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1",
1755+
SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS.key -> "0"
17531756
) {
17541757
val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
17551758
"cleanSource" -> cleanOption, "sourceArchiveDir" -> archiveDir.getAbsolutePath)

0 commit comments

Comments
 (0)