@@ -80,6 +80,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
80
80
81
81
private val serializableConfOpt = conf.map(new SerializableWritable (_))
82
82
83
+ /**
84
+ * Minimum duration of remembering the information of selected files. Defaults to 1 minute.
85
+ *
86
+ * Files with mod times older than this "window" of remembering will be ignored. So if new
87
+ * files are visible within this window, then the file will get selected in the next batch.
88
+ */
89
+ private val minRememberDurationMin = Minutes (ssc.sparkContext.getConf
90
+ .get(" spark.streaming.minRememberDurationMin" , " 1" )
91
+ .toLong)
92
+
83
93
// This is a def so that it works during checkpoint recovery:
84
94
private def clock = ssc.scheduler.clock
85
95
@@ -95,7 +105,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
95
105
* This would allow us to filter away not-too-old files which have already been recently
96
106
* selected and processed.
97
107
*/
98
- private val numBatchesToRemember = FileInputDStream .calculateNumBatchesToRemember(slideDuration)
108
+ private val numBatchesToRemember = FileInputDStream
109
+ .calculateNumBatchesToRemember(slideDuration, minRememberDurationMin)
99
110
private val durationToRemember = slideDuration * numBatchesToRemember
100
111
remember(durationToRemember)
101
112
@@ -330,23 +341,14 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
330
341
private [streaming]
331
342
object FileInputDStream {
332
343
333
- /**
334
- * Minimum duration of remembering the information of selected files. Defaults to 1 minute.
335
- *
336
- * Files with mod times older than this "window" of remembering will be ignored. So if new
337
- * files are visible within this window, then the file will get selected in the next batch.
338
- */
339
- private val minRememberDurationMin = Minutes (new SparkConf ()
340
- .get(" spark.streaming.minRememberDurationMin" , " 1" )
341
- .toLong)
342
-
343
344
def defaultFilter (path : Path ): Boolean = ! path.getName().startsWith(" ." )
344
345
345
346
/**
346
347
* Calculate the number of last batches to remember, such that all the files selected in
347
348
* at least last minRememberDurationMin duration can be remembered.
348
349
*/
349
- def calculateNumBatchesToRemember (batchDuration : Duration ): Int = {
350
+ def calculateNumBatchesToRemember (batchDuration : Duration ,
351
+ minRememberDurationMin : Duration ): Int = {
350
352
math.ceil(minRememberDurationMin.milliseconds.toDouble / batchDuration.milliseconds).toInt
351
353
}
352
354
}
0 commit comments