1818package org .apache .spark .streaming .receiver
1919
2020import org .apache .spark .{Logging , SparkConf }
21- import java . util .concurrent .TimeUnit . _
21+ import com . google . common . util .concurrent .{ RateLimiter => GuavaRateLimiter }
2222
2323/** Provides waitToPush() method to limit the rate at which receivers consume data.
2424 *
@@ -33,37 +33,13 @@ import java.util.concurrent.TimeUnit._
3333 */
3434private [receiver] abstract class RateLimiter (conf : SparkConf ) extends Logging {
3535
36- private var lastSyncTime = System .nanoTime
37- private var messagesWrittenSinceSync = 0L
3836 private val desiredRate = conf.getInt(" spark.streaming.receiver.maxRate" , 0 )
39- private val SYNC_INTERVAL = NANOSECONDS .convert( 10 , SECONDS )
37+ private lazy val rateLimiter = GuavaRateLimiter .create(desiredRate )
4038
4139 def waitToPush () {
4240 if ( desiredRate <= 0 ) {
4341 return
4442 }
45- val now = System .nanoTime
46- val elapsedNanosecs = math.max(now - lastSyncTime, 1 )
47- val rate = messagesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs
48- if (rate < desiredRate) {
49- // It's okay to write; just update some variables and return
50- messagesWrittenSinceSync += 1
51- if (now > lastSyncTime + SYNC_INTERVAL ) {
52- // Sync interval has passed; let's resync
53- lastSyncTime = now
54- messagesWrittenSinceSync = 1
55- }
56- } else {
57- // Calculate how much time we should sleep to bring ourselves to the desired rate.
58- val targetTimeInMillis = messagesWrittenSinceSync.toDouble * 1000 / desiredRate
59- val elapsedTimeInMillis = elapsedNanosecs / 1000000
60- val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
61- if (sleepTimeInMillis > 0 ) {
62- logTrace(" Natural rate is " + rate + " per second but desired rate is " +
63- desiredRate + " , sleeping for " + sleepTimeInMillis + " ms to compensate." )
64- Thread .sleep(sleepTimeInMillis.toInt)
65- }
66- waitToPush()
67- }
43+ rateLimiter.acquire()
6844 }
6945}
0 commit comments