Skip to content

[SPARK-29876][SS] Delete/archive file source completed files in separate thread #26502

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -551,9 +551,10 @@ Here are the details of all the sources in Spark.
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.<br/>
For example, suppose you provide '/hello?/spark/*' as source pattern, '/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. '/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as '/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be OK as it doesn't match.<br/>
Spark will move source files respecting their own path. For example, if the path of source file is <code>/a/b/dataset.txt</code> and the path of archive directory is <code>/archived/here</code>, file will be moved to <code>/archived/here/a/b/dataset.txt</code>.<br/>
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down, even if it's happening in separate thread) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
Number of threads used in completed file cleaner can be configured with<code>spark.sql.streaming.fileSource.cleaner.numThreads</code> (default: 1).<br/>
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.<br/>
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query. Spark may not clean up some source files in some circumstances - e.g. the application doesn't shut down gracefully, too many files are queued to clean up.
<br/><br/>
For file-format-specific options, see the related methods in <code>DataStreamReader</code>
(<a href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>/<a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val FILE_SOURCE_CLEANER_NUM_THREADS =
buildConf("spark.sql.streaming.fileSource.cleaner.numThreads")
.doc("Number of threads used in the file source completed file cleaner.")
.intConf
.createWithDefault(1)

val STREAMING_SCHEMA_INFERENCE =
buildConf("spark.sql.streaming.schemaInference")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.streaming

import java.net.URI
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit._

import scala.util.control.NonFatal
Expand All @@ -30,7 +31,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ThreadUtils

/**
* A very simple source that reads files from the given directory as they appear.
Expand Down Expand Up @@ -285,7 +288,7 @@ class FileStreamSource(
}
}

override def stop(): Unit = {}
override def stop(): Unit = sourceCleaner.foreach(_.stop())
}


Expand Down Expand Up @@ -353,8 +356,35 @@ object FileStreamSource {
def size: Int = map.size()
}

private[sql] trait FileStreamSourceCleaner {
def clean(entry: FileEntry): Unit
private[sql] abstract class FileStreamSourceCleaner extends Logging {
private val cleanThreadPool: Option[ThreadPoolExecutor] = {
val numThreads = SQLConf.get.getConf(SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS)
if (numThreads > 0) {
logDebug(s"Cleaning file source on $numThreads separate thread(s)")
Some(ThreadUtils.newDaemonCachedThreadPool("file-source-cleaner-threadpool", numThreads))
} else {
logDebug("Cleaning file source on main thread")
None
}
}

def stop(): Unit = cleanThreadPool.foreach(ThreadUtils.shutdown(_))

def clean(entry: FileEntry): Unit = {
cleanThreadPool match {
case Some(p) =>
p.submit(new Runnable {
override def run(): Unit = {
cleanTask(entry)
}
})

case None =>
cleanTask(entry)
}
}

protected def cleanTask(entry: FileEntry): Unit
}

private[sql] object FileStreamSourceCleaner {
Expand Down Expand Up @@ -448,7 +478,7 @@ object FileStreamSource {
filters.toList
}

override def clean(entry: FileEntry): Unit = {
override protected def cleanTask(entry: FileEntry): Unit = {
val curPath = new Path(new URI(entry.path))
val newPath = new Path(baseArchivePath.toString.stripSuffix("/") + curPath.toUri.getPath)

Expand All @@ -472,7 +502,7 @@ object FileStreamSource {
private[sql] class SourceFileRemover(fileSystem: FileSystem)
extends FileStreamSourceCleaner with Logging {

override def clean(entry: FileEntry): Unit = {
override protected def cleanTask(entry: FileEntry): Unit = {
val curPath = new Path(new URI(entry.path))
try {
logDebug(s"Removing completed file $curPath")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1636,7 +1636,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
withSQLConf(
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
// Force deleting the old logs
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1",
SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS.key -> "0"
) {
val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
"cleanSource" -> "delete")
Expand Down Expand Up @@ -1680,7 +1681,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
withSQLConf(
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
// Force deleting the old logs
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1",
SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS.key -> "0"
) {
val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
"cleanSource" -> "archive", "sourceArchiveDir" -> archiveDir.getAbsolutePath)
Expand Down Expand Up @@ -1749,7 +1751,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
withSQLConf(
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
// Force deleting the old logs
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1",
SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS.key -> "0"
) {
val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
"cleanSource" -> cleanOption, "sourceArchiveDir" -> archiveDir.getAbsolutePath)
Expand Down