Skip to content

Commit affee1d

Browse files
committed
SPARK-3276 Changed the property name and variable name for minRememberDuration
* switched back to using spark.streaming.minRememberDuration * renamed minRememberDurationMin to minRememberDuration
1 parent c9d58ca commit affee1d

File tree

1 file changed

+8
-8
lines changed

1 file changed

+8
-8
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
6363
* the streaming app.
6464
* - If a file is to be visible in the directory listings, it must be visible within a certain
6565
* duration of the mod time of the file. This duration is the "remember window", which is set to
66-
* 1 minute (see `FileInputDStream.minRememberDurationMin`). Otherwise, the file will never be
66+
* 1 minute (see `FileInputDStream.minRememberDuration`). Otherwise, the file will never be
6767
* selected as the mod time will be less than the ignore threshold when it becomes visible.
6868
* - Once a file is visible, the mod time cannot change. If it does due to appends, then the
6969
* processing semantics are undefined.
@@ -81,13 +81,13 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
8181
private val serializableConfOpt = conf.map(new SerializableWritable(_))
8282

8383
/**
84-
* Minimum duration of remembering the information of selected files. Defaults to 1 minute.
84+
* Minimum duration of remembering the information of selected files. Defaults to 60 seconds.
8585
*
8686
* Files with mod times older than this "window" of remembering will be ignored. So if new
8787
* files are visible within this window, then the file will get selected in the next batch.
8888
*/
89-
private val minRememberDurationMin =
90-
Minutes(ssc.conf.getLong("spark.streaming.minRememberDurationMin", 1L))
89+
private val minRememberDuration =
90+
Seconds(ssc.conf.getLong("spark.streaming.minRememberDuration", 60L))
9191

9292
// This is a def so that it works during checkpoint recovery:
9393
private def clock = ssc.scheduler.clock
@@ -105,7 +105,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
105105
* selected and processed.
106106
*/
107107
private val numBatchesToRemember = FileInputDStream
108-
.calculateNumBatchesToRemember(slideDuration, minRememberDurationMin)
108+
.calculateNumBatchesToRemember(slideDuration, minRememberDuration)
109109
private val durationToRemember = slideDuration * numBatchesToRemember
110110
remember(durationToRemember)
111111

@@ -344,10 +344,10 @@ object FileInputDStream {
344344

345345
/**
346346
* Calculate the number of last batches to remember, such that all the files selected in
347-
* at least last minRememberDurationMin duration can be remembered.
347+
* at least last minRememberDuration duration can be remembered.
348348
*/
349349
def calculateNumBatchesToRemember(batchDuration: Duration,
350-
minRememberDurationMin: Duration): Int = {
351-
math.ceil(minRememberDurationMin.milliseconds.toDouble / batchDuration.milliseconds).toInt
350+
minRememberDuration: Duration): Int = {
351+
math.ceil(minRememberDuration.milliseconds.toDouble / batchDuration.milliseconds).toInt
352352
}
353353
}

0 commit comments

Comments
 (0)