Skip to content

[SPARK-17159][Streaming] optimise check for new files in FileInputDStream #17745

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@

package org.apache.spark.streaming.dstream

import java.io.{IOException, ObjectInputStream}
import java.io.{FileNotFoundException, IOException, ObjectInputStream}

import scala.collection.mutable
import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.StreamInputInfo
import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils}
import org.apache.spark.util.{SerializableConfiguration, Utils}

/**
* This class represents an input stream that monitors a Hadoop-compatible filesystem for new
Expand Down Expand Up @@ -122,9 +122,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()

// Read-through cache of file mod times, used to speed up mod time lookups
@transient private var fileToModTime = new TimeStampedHashMap[String, Long](true)

// Timestamp of the last round of finding files
@transient private var lastNewFileFindingTime = 0L

Expand All @@ -140,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* a union RDD out of them. Note that this maintains the list of files that were processed
* in the latest modification time in the previous call to this method. This is because the
* modification time returned by the FileStatus API seems to return times only at the
* granularity of seconds. And new files may have the same modification time as the
* granularity of seconds in HDFS. And new files may have the same modification time as the
* latest modification time in the previous call to this method yet was not reported in
* the previous call.
*/
Expand Down Expand Up @@ -173,8 +170,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
// Delete file mod times that weren't accessed in the last round of getting new files
fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
}

/**
Expand All @@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")

val newFileFilter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val directoryFilter = new PathFilter {
override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory
}
val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this approach, we might be fetching a very large list of files and then filtering through the directories. If the fetched, list is too large, then it can be a problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, on looking at the code of glob status, it does filter at the end, so doing something like above might just be ok.

Also globStatus does a listStatus() per child directory or a getFileStatus() in case input pattern is not a glob, each call to listStatus does 3+ http calls and each call to getFileStatus does 2 http calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

globStatus is flawed; key limit is that it does a tree walk. It needs to be replaced with an object-store-list specific one. See HADOOP-13371.

The issue with implementing an s3a flat-list and filter is that if the wildcard is a few entries up from the child path and there are lots of children, e..g

s3a://bucket/data/year=201?/month=*/day=*/

then if there are many files under year/month/day entries, all get listed during the filter.

What I think would need to be done is to be able to config the FS to limit the depth of where it switches to bulk listing; so here I could say "depth=2", and so the year=? would be done via globbing, but the month= and day= would be better.

Or maybe just start with making the whole thing optional, and let the caller deal with it.

Anyway, options here

  • fix the Hadoop side call. Nice and broadly useful
  • see if spark can be moved off the globStatus call. Will change matching. But if you provide a new "cloudstore" connector, that could be done, couldn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, having an object store specific version of glob, will be broadly helpful. In the mean time, this patch seems to be saving a lot of http requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still a lot; I think we can do a new one.

Latest version of this code is here; I think its time to set up a module in bahir for this

.filter(_.isDirectory)
.map(_.getPath)
val newFiles = directories.flatMap(dir =>
fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
fs.listStatus(dir)
.filter(isNewFile(_, currentTime, modTimeIgnoreThreshold))
.map(_.getPath.toString))
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
logInfo(s"Finding new files took $timeTaken ms")
if (timeTaken > slideDuration.milliseconds) {
logWarning(
"Time taken to find new files exceeds the batch size. " +
s"Time taken to find new files $timeTaken exceeds the batch size. " +
"Consider increasing the batch size or reducing the number of " +
"files in the monitored directory."
"files in the monitored directories."
)
}
newFiles
} catch {
case e: FileNotFoundException =>
logWarning(s"No directory to scan: $directoryPath: $e")
Array.empty
case e: Exception =>
logWarning("Error finding new files", e)
logWarning(s"Error finding new files under $directoryPath", e)
reset()
Array.empty
}
Expand All @@ -241,16 +236,24 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1).
* Hence they can get selected as new files again. To prevent this, files whose mod time is more
* than current batch time are not considered.
* @param fileStatus file status
* @param currentTime time of the batch
* @param modTimeIgnoreThreshold the ignore threshold
* @return true if the file has been modified within the batch window
*/
private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = {
private def isNewFile(
fileStatus: FileStatus,
currentTime: Long,
modTimeIgnoreThreshold: Long): Boolean = {
val path = fileStatus.getPath
val pathStr = path.toString
// Reject file if it does not satisfy filter
if (!filter(path)) {
logDebug(s"$pathStr rejected by filter")
return false
}
// Reject file if it was created before the ignore time
val modTime = getFileModTime(path)
val modTime = fileStatus.getModificationTime()
if (modTime <= modTimeIgnoreThreshold) {
// Use <= instead of < to avoid SPARK-4518
logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold")
Expand Down Expand Up @@ -292,11 +295,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
new UnionRDD(context.sparkContext, fileRDDs)
}

/** Get file mod time from cache or fetch it from the file system */
private def getFileModTime(path: Path) = {
fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
}

private def directoryPath: Path = {
if (_path == null) _path = new Path(directory)
_path
Expand All @@ -318,7 +316,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.scalatest.BeforeAndAfter
Expand Down Expand Up @@ -130,10 +131,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}

test("binary records stream") {
var testDir: File = null
try {
withTempDir { testDir =>
val batchDuration = Seconds(2)
testDir = Utils.createTempDir()
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, StandardCharsets.UTF_8)
Expand Down Expand Up @@ -176,8 +175,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(obtainedOutput(i) === input.map(b => (b + i).toByte))
}
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
}
}

Expand All @@ -190,10 +187,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}

test("file input stream - wildcard") {
var testDir: File = null
try {
withTempDir { testDir =>
val batchDuration = Seconds(2)
testDir = Utils.createTempDir()
val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1")
val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2")

Expand Down Expand Up @@ -221,12 +216,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// not enough to trigger a batch
clock.advance(batchDuration.milliseconds / 2)

def createFileAndAdvenceTime(data: Int, dir: File): Unit = {
def createFileAndAdvanceTime(data: Int, dir: File): Unit = {
val file = new File(testSubDir1, data.toString)
Files.write(data + "\n", file, StandardCharsets.UTF_8)
assert(file.setLastModified(clock.getTimeMillis()))
assert(file.lastModified === clock.getTimeMillis())
logInfo("Created file " + file)
logInfo(s"Created file $file")
// Advance the clock after creating the file to avoid a race when
// setting its modification time
clock.advance(batchDuration.milliseconds)
Expand All @@ -236,18 +231,90 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
// Over time, create files in the temp directory 1
val input1 = Seq(1, 2, 3, 4, 5)
input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1))
input1.foreach(i => createFileAndAdvanceTime(i, testSubDir1))

// Over time, create files in the temp directory 1
val input2 = Seq(6, 7, 8, 9, 10)
input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2))
input2.foreach(i => createFileAndAdvanceTime(i, testSubDir2))

// Verify that all the files have been read
val expectedOutput = (input1 ++ input2).map(_.toString).toSet
assert(outputQueue.asScala.flatten.toSet === expectedOutput)
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
}
}

/**
* Tests that renamed directories are included in new batches -but that only files created
* within the batch window are included.
* Uses the Hadoop APIs to verify consistent behavior with the operations used internally.
*/
test("renamed directories are scanned") {
withTempDir { testDir =>
val batchDuration = Seconds(2)
val durationMs = batchDuration.milliseconds
val testPath = new Path(testDir.toURI)
val streamDir = new Path(testPath, "streaming")
val streamGlobPath = new Path(streamDir, "sub*")
val generatedDir = new Path(testPath, "generated")
val generatedSubDir = new Path(generatedDir, "subdir")
val renamedSubDir = new Path(streamDir, "subdir")

withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
val sparkContext = ssc.sparkContext
val hc = sparkContext.hadoopConfiguration
val fs = FileSystem.get(testPath.toUri, hc)

fs.delete(testPath, true)
fs.mkdirs(testPath)
fs.mkdirs(streamDir)
fs.mkdirs(generatedSubDir)

def write(path: Path, text: String): Unit = {
val out = fs.create(path, true)
IOUtils.write(text, out)
out.close()
}

val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val existingFile = new Path(generatedSubDir, "existing")
write(existingFile, "existing\n")
val status = fs.getFileStatus(existingFile)
clock.setTime(status.getModificationTime + durationMs)
val batchCounter = new BatchCounter(ssc)
val fileStream = ssc.textFileStream(streamGlobPath.toUri.toString)
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
val outputStream = new TestOutputStream(fileStream, outputQueue)
outputStream.register()

ssc.start()
clock.advance(durationMs)
eventually(eventuallyTimeout) {
assert(1 === batchCounter.getNumCompletedBatches)
}
// create and rename the file
// put a file into the generated directory
val textPath = new Path(generatedSubDir, "renamed.txt")
write(textPath, "renamed\n")
val now = clock.getTimeMillis()
val modTime = now + durationMs / 2
fs.setTimes(textPath, modTime, modTime)
val textFilestatus = fs.getFileStatus(existingFile)
assert(textFilestatus.getModificationTime < now + durationMs)

// rename the directory under the path being scanned
fs.rename(generatedSubDir, renamedSubDir)

// move forward one window
clock.advance(durationMs)
// await the next scan completing
eventually(eventuallyTimeout) {
assert(2 === batchCounter.getNumCompletedBatches)
}
// verify that the "renamed" file is found, but not the "existing" one which is out of
// the window
assert(Set("renamed") === outputQueue.asScala.flatten.toSet)
}
}
}

Expand Down Expand Up @@ -416,10 +483,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}

def testFileStream(newFilesOnly: Boolean) {
var testDir: File = null
try {
withTempDir { testDir =>
val batchDuration = Seconds(2)
testDir = Utils.createTempDir()
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, StandardCharsets.UTF_8)
Expand Down Expand Up @@ -466,8 +531,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
assert(outputQueue.asScala.flatten.toSet === expectedOutput)
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.streaming

import java.io.{IOException, ObjectInputStream}
import java.io.{File, IOException, ObjectInputStream}
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -557,4 +557,16 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
verifyOutput[W](output.toSeq, expectedOutput, useSet)
}
}

/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
* (originally from `SqlTestUtils`.)
* @todo Probably this method should be moved to a more general place
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
try f(dir) finally Utils.deleteRecursively(dir)
}

}