Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkConf, SerializableWritable}
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming._
import org.apache.spark.util.{TimeStampedHashMap, Utils}
Expand Down Expand Up @@ -63,7 +63,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
* the streaming app.
* - If a file is to be visible in the directory listings, it must be visible within a certain
* duration of the mod time of the file. This duration is the "remember window", which is set to
* 1 minute (see `FileInputDStream.MIN_REMEMBER_DURATION`). Otherwise, the file will never be
* 1 minute (see `FileInputDStream.minRememberDuration`). Otherwise, the file will never be
* selected as the mod time will be less than the ignore threshold when it becomes visible.
* - Once a file is visible, the mod time cannot change. If it does due to appends, then the
* processing semantics are undefined.
Expand All @@ -80,6 +80,15 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](

private val serializableConfOpt = conf.map(new SerializableWritable(_))

/**
* Minimum duration of remembering the information of selected files. Defaults to 60 seconds.
*
* Files with mod times older than this "window" of remembering will be ignored. So if new
* files are visible within this window, then the file will get selected in the next batch.
*/
private val minRememberDurationS =
Seconds(ssc.conf.getTimeAsSeconds("spark.streaming.minRememberDuration", "60s"))

// This is a def so that it works during checkpoint recovery:
private def clock = ssc.scheduler.clock

Expand All @@ -95,7 +104,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
* This would allow us to filter away not-too-old files which have already been recently
* selected and processed.
*/
private val numBatchesToRemember = FileInputDStream.calculateNumBatchesToRemember(slideDuration)
private val numBatchesToRemember = FileInputDStream
.calculateNumBatchesToRemember(slideDuration, minRememberDurationS)
private val durationToRemember = slideDuration * numBatchesToRemember
remember(durationToRemember)

Expand Down Expand Up @@ -330,20 +340,14 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
private[streaming]
object FileInputDStream {

/**
* Minimum duration of remembering the information of selected files. Files with mod times
* older than this "window" of remembering will be ignored. So if new files are visible
* within this window, then the file will get selected in the next batch.
*/
private val MIN_REMEMBER_DURATION = Minutes(1)

def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")

/**
* Calculate the number of last batches to remember, such that all the files selected in
* at least last MIN_REMEMBER_DURATION duration can be remembered.
* at least last minRememberDurationS duration can be remembered.
*/
def calculateNumBatchesToRemember(batchDuration: Duration): Int = {
math.ceil(MIN_REMEMBER_DURATION.milliseconds.toDouble / batchDuration.milliseconds).toInt
def calculateNumBatchesToRemember(batchDuration: Duration,
minRememberDurationS: Duration): Int = {
math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt
}
}