Skip to content

[SPARK-17159][STREAM] Significant speed up for running spark streaming against Object store. #22339

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@

package org.apache.spark.streaming.dstream

import java.io.{IOException, ObjectInputStream}
import java.io.{FileNotFoundException, IOException, ObjectInputStream}

import scala.collection.mutable
import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.StreamInputInfo
import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils}
import org.apache.spark.util.{SerializableConfiguration, Utils}

/**
* This class represents an input stream that monitors a Hadoop-compatible filesystem for new
Expand Down Expand Up @@ -122,9 +122,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()

// Read-through cache of file mod times, used to speed up mod time lookups
@transient private var fileToModTime = new TimeStampedHashMap[String, Long](true)

// Timestamp of the last round of finding files
@transient private var lastNewFileFindingTime = 0L

Expand All @@ -140,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* a union RDD out of them. Note that this maintains the list of files that were processed
* in the latest modification time in the previous call to this method. This is because the
* modification time returned by the FileStatus API seems to return times only at the
* granularity of seconds. And new files may have the same modification time as the
* granularity of seconds in HDFS. And new files may have the same modification time as the
* latest modification time in the previous call to this method yet was not reported in
* the previous call.
*/
Expand Down Expand Up @@ -173,8 +170,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
// Delete file mod times that weren't accessed in the last round of getting new files
fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
}

/**
Expand All @@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")

val newFileFilter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val directoryFilter = new PathFilter {
override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory
}
val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the .getOrElse could come at the end, but it hardly matters.

.filter(_.isDirectory)
.map(_.getPath)
val newFiles = directories.flatMap(dir =>
fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
fs.listStatus(dir)
.filter(isNewFile(_, currentTime, modTimeIgnoreThreshold))
.map(_.getPath.toString))
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
logDebug(s"Finding new files took $timeTaken ms")
if (timeTaken > slideDuration.milliseconds) {
logWarning(
"Time taken to find new files exceeds the batch size. " +
s"Time taken to find new files $timeTaken exceeds the batch size. " +
"Consider increasing the batch size or reducing the number of " +
"files in the monitored directory."
"files in the monitored directories."
)
}
newFiles
} catch {
case e: FileNotFoundException =>
logWarning(s"No directory to scan: $directoryPath: $e")
Array.empty
case e: Exception =>
logWarning("Error finding new files", e)
logWarning(s"Error finding new files under $directoryPath", e)
reset()
Array.empty
}
Expand All @@ -241,16 +236,24 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1).
* Hence they can get selected as new files again. To prevent this, files whose mod time is more
* than current batch time are not considered.
* @param fileStatus file status
* @param currentTime time of the batch
* @param modTimeIgnoreThreshold the ignore threshold
* @return true if the file has been modified within the batch window
*/
private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = {
private def isNewFile(
fileStatus: FileStatus,
currentTime: Long,
modTimeIgnoreThreshold: Long): Boolean = {
val path = fileStatus.getPath
val pathStr = path.toString
// Reject file if it does not satisfy filter
if (!filter(path)) {
logDebug(s"$pathStr rejected by filter")
return false
}
// Reject file if it was created before the ignore time
val modTime = getFileModTime(path)
val modTime = fileStatus.getModificationTime()
if (modTime <= modTimeIgnoreThreshold) {
// Use <= instead of < to avoid SPARK-4518
logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold")
Expand Down Expand Up @@ -292,11 +295,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
new UnionRDD(context.sparkContext, fileRDDs)
}

/** Get file mod time from cache or fetch it from the file system */
private def getFileModTime(path: Path) = {
fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
}

private def directoryPath: Path = {
if (_path == null) _path = new Path(directory)
_path
Expand All @@ -318,7 +316,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.scalatest.BeforeAndAfter
Expand Down Expand Up @@ -130,10 +131,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}

test("binary records stream") {
var testDir: File = null
try {
withTempDir { testDir =>
val batchDuration = Seconds(2)
testDir = Utils.createTempDir()
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, StandardCharsets.UTF_8)
Expand Down Expand Up @@ -176,8 +175,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(obtainedOutput(i) === input.map(b => (b + i).toByte))
}
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
}
}

Expand All @@ -190,10 +187,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}

test("file input stream - wildcard") {
var testDir: File = null
try {
withTempDir { testDir =>
val batchDuration = Seconds(2)
testDir = Utils.createTempDir()
val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1")
val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2")

Expand Down Expand Up @@ -221,12 +216,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// not enough to trigger a batch
clock.advance(batchDuration.milliseconds / 2)

def createFileAndAdvenceTime(data: Int, dir: File): Unit = {
def createFileAndAdvanceTime(data: Int, dir: File): Unit = {
val file = new File(testSubDir1, data.toString)
Files.write(data + "\n", file, StandardCharsets.UTF_8)
assert(file.setLastModified(clock.getTimeMillis()))
assert(file.lastModified === clock.getTimeMillis())
logInfo("Created file " + file)
logInfo(s"Created file $file")
// Advance the clock after creating the file to avoid a race when
// setting its modification time
clock.advance(batchDuration.milliseconds)
Expand All @@ -236,18 +231,85 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
// Over time, create files in the temp directory 1
val input1 = Seq(1, 2, 3, 4, 5)
input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1))
input1.foreach(i => createFileAndAdvanceTime(i, testSubDir1))

// Over time, create files in the temp directory 1
val input2 = Seq(6, 7, 8, 9, 10)
input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2))
input2.foreach(i => createFileAndAdvanceTime(i, testSubDir2))

// Verify that all the files have been read
val expectedOutput = (input1 ++ input2).map(_.toString).toSet
assert(outputQueue.asScala.flatten.toSet === expectedOutput)
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
}
}

test("Modified files are correctly detected.") {
withTempDir { testDir =>
val batchDuration = Seconds(2)
val durationMs = batchDuration.milliseconds
val testPath = new Path(testDir.toURI)
val streamDir = new Path(testPath, "streaming")
val streamGlobPath = new Path(streamDir, "sub*")
val generatedDir = new Path(testPath, "generated")
val generatedSubDir = new Path(generatedDir, "subdir")
val renamedSubDir = new Path(streamDir, "subdir")

withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
val sparkContext = ssc.sparkContext
val hc = sparkContext.hadoopConfiguration
val fs = FileSystem.get(testPath.toUri, hc)

fs.delete(testPath, true)
fs.mkdirs(testPath)
fs.mkdirs(streamDir)
fs.mkdirs(generatedSubDir)

def write(path: Path, text: String): Unit = {
val out = fs.create(path, true)
IOUtils.write(text, out)
out.close()
}

val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val existingFile = new Path(generatedSubDir, "existing")
write(existingFile, "existing\n")
val status = fs.getFileStatus(existingFile)
clock.setTime(status.getModificationTime + durationMs)
val batchCounter = new BatchCounter(ssc)
val fileStream = ssc.textFileStream(streamGlobPath.toUri.toString)
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
val outputStream = new TestOutputStream(fileStream, outputQueue)
outputStream.register()

ssc.start()
clock.advance(durationMs)
eventually(eventuallyTimeout) {
assert(1 === batchCounter.getNumCompletedBatches)
}
// create and rename the file
// put a file into the generated directory
val textPath = new Path(generatedSubDir, "renamed.txt")
write(textPath, "renamed\n")
val now = clock.getTimeMillis()
val modTime = now + durationMs / 2
fs.setTimes(textPath, modTime, modTime)
val textFilestatus = fs.getFileStatus(existingFile)
assert(textFilestatus.getModificationTime < now + durationMs)

// rename the directory under the path being scanned
fs.rename(generatedSubDir, renamedSubDir)

// move forward one window
clock.advance(durationMs)
// await the next scan completing
eventually(eventuallyTimeout) {
assert(2 === batchCounter.getNumCompletedBatches)
}
// verify that the "renamed" file is found, but not the "existing" one which is out of
// the window
assert(Set("renamed") === outputQueue.asScala.flatten.toSet)
}
}
}

Expand Down Expand Up @@ -416,10 +478,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}

def testFileStream(newFilesOnly: Boolean) {
var testDir: File = null
try {
withTempDir { testDir =>
val batchDuration = Seconds(2)
testDir = Utils.createTempDir()
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, StandardCharsets.UTF_8)
Expand Down Expand Up @@ -466,8 +526,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
assert(outputQueue.asScala.flatten.toSet === expectedOutput)
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.streaming

import java.io.{IOException, ObjectInputStream}
import java.io.{File, IOException, ObjectInputStream}
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -557,4 +557,16 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
verifyOutput[W](output.toSeq, expectedOutput, useSet)
}
}

/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
* (originally from `SqlTestUtils`.)
* @todo Probably this method should be moved to a more general place
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
try f(dir) finally Utils.deleteRecursively(dir)
}

}