Skip to content

Commit 766f938

Browse files
committed
SPARK-3276 Switched to using newly added getTimeAsSeconds method.
* Switched to using newly added getTimeAsSeconds method (see apache#5236) * Renamed minRememberDuration to minRememberDurationS to be compatible with the examples in the pull request above.
1 parent affee1d commit 766f938

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
8686
* Files with mod times older than this "window" of remembering will be ignored. So if new
8787
* files are visible within this window, then the file will get selected in the next batch.
8888
*/
89-
private val minRememberDuration =
90-
Seconds(ssc.conf.getLong("spark.streaming.minRememberDuration", 60L))
89+
private val minRememberDurationS =
90+
Seconds(ssc.conf.getTimeAsSeconds("spark.streaming.minRememberDuration", "60s"))
9191

9292
// This is a def so that it works during checkpoint recovery:
9393
private def clock = ssc.scheduler.clock
@@ -105,7 +105,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
105105
* selected and processed.
106106
*/
107107
private val numBatchesToRemember = FileInputDStream
108-
.calculateNumBatchesToRemember(slideDuration, minRememberDuration)
108+
.calculateNumBatchesToRemember(slideDuration, minRememberDurationS)
109109
private val durationToRemember = slideDuration * numBatchesToRemember
110110
remember(durationToRemember)
111111

@@ -344,10 +344,10 @@ object FileInputDStream {
344344

345345
/**
346346
* Calculate the number of last batches to remember, such that all the files selected in
347-
* at least last minRememberDuration duration can be remembered.
347+
* at least last minRememberDurationS duration can be remembered.
348348
*/
349349
def calculateNumBatchesToRemember(batchDuration: Duration,
350-
minRememberDuration: Duration): Int = {
351-
math.ceil(minRememberDuration.milliseconds.toDouble / batchDuration.milliseconds).toInt
350+
minRememberDurationS: Duration): Int = {
351+
math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt
352352
}
353353
}

0 commit comments

Comments
 (0)