Skip to content

Commit 948f2f6

Browse files
David McGuiresrowen
authored andcommitted
[SPARK-6985][streaming] Receiver maxRate over 1000 causes a StackOverflowError
A simple truncation in integer division (on rates over 1000 messages / second) causes the existing implementation to sleep for 0 milliseconds, then call itself recursively; this causes what is essentially an infinite recursion, since the base case of the calculated amount of time having elapsed can't be reached before available stack space is exhausted. A fix to this truncation error is included in this patch. However, even with the defect patched, the accuracy of the existing implementation is abysmal (the error bounds of the original test were effectively [-30%, +10%], although this fact was obscured by hard-coded error margins); as such, when the error bounds were tightened down to [-5%, +5%], the existing implementation failed to meet the new, tightened, requirements. Therefore, an industry-vetted solution (from Guava) was used to get the adapted tests to pass. Author: David McGuire <david.mcguire2@nike.com> Closes #5559 from dmcguire81/master and squashes the following commits: d29d2e0 [David McGuire] Back out to +/-5% error margins, for flexibility in timing 8be6934 [David McGuire] Fix spacing per code review 90e98b9 [David McGuire] Address scalastyle errors 29011bd [David McGuire] Further ratchet down the error margins b33b796 [David McGuire] Eliminate dependency on even distribution by BlockGenerator 8f2934b [David McGuire] Remove arbitrary thread timing / cooperation code 70ee310 [David McGuire] Use Thread.yield(), since Thread.sleep(0) is system-dependent 82ee46d [David McGuire] Replace guard clause with nested conditional 2794717 [David McGuire] Replace the RateLimiter with the Guava implementation 38f3ca8 [David McGuire] Ratchet down the error rate to +/- 5%; tests fail 24b1bc0 [David McGuire] Fix truncation in integer division causing infinite recursion d6e1079 [David McGuire] Stack overflow error in RateLimiter on rates over 1000/s (cherry picked from commit 5fea3e5) Signed-off-by: Sean Owen <sowen@cloudera.com>
1 parent 8549ff4 commit 948f2f6

File tree

2 files changed

+21
-41
lines changed

2 files changed

+21
-41
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.streaming.receiver
1919

2020
import org.apache.spark.{Logging, SparkConf}
21-
import java.util.concurrent.TimeUnit._
21+
import com.google.common.util.concurrent.{RateLimiter=>GuavaRateLimiter}
2222

2323
/** Provides waitToPush() method to limit the rate at which receivers consume data.
2424
*
@@ -33,37 +33,12 @@ import java.util.concurrent.TimeUnit._
3333
*/
3434
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
3535

36-
private var lastSyncTime = System.nanoTime
37-
private var messagesWrittenSinceSync = 0L
3836
private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
39-
private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
37+
private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate)
4038

4139
def waitToPush() {
42-
if( desiredRate <= 0 ) {
43-
return
44-
}
45-
val now = System.nanoTime
46-
val elapsedNanosecs = math.max(now - lastSyncTime, 1)
47-
val rate = messagesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs
48-
if (rate < desiredRate) {
49-
// It's okay to write; just update some variables and return
50-
messagesWrittenSinceSync += 1
51-
if (now > lastSyncTime + SYNC_INTERVAL) {
52-
// Sync interval has passed; let's resync
53-
lastSyncTime = now
54-
messagesWrittenSinceSync = 1
55-
}
56-
} else {
57-
// Calculate how much time we should sleep to bring ourselves to the desired rate.
58-
val targetTimeInMillis = messagesWrittenSinceSync * 1000 / desiredRate
59-
val elapsedTimeInMillis = elapsedNanosecs / 1000000
60-
val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
61-
if (sleepTimeInMillis > 0) {
62-
logTrace("Natural rate is " + rate + " per second but desired rate is " +
63-
desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to compensate.")
64-
Thread.sleep(sleepTimeInMillis)
65-
}
66-
waitToPush()
40+
if (desiredRate > 0) {
41+
rateLimiter.acquire()
6742
}
6843
}
6944
}

streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
158158
test("block generator throttling") {
159159
val blockGeneratorListener = new FakeBlockGeneratorListener
160160
val blockInterval = 100
161-
val maxRate = 100
161+
val maxRate = 1001
162162
val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
163163
set("spark.streaming.receiver.maxRate", maxRate.toString)
164164
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
@@ -176,7 +176,6 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
176176
blockGenerator.addData(count)
177177
generatedData += count
178178
count += 1
179-
Thread.sleep(1)
180179
}
181180
blockGenerator.stop()
182181

@@ -185,25 +184,31 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
185184
assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")
186185
assert(recordedData.toSet === generatedData.toSet, "Received data not same")
187186

188-
// recordedData size should be close to the expected rate
189-
val minExpectedMessages = expectedMessages - 3
190-
val maxExpectedMessages = expectedMessages + 1
187+
// recordedData size should be close to the expected rate; use an error margin proportional to
188+
// the value, so that rate changes don't cause a brittle test
189+
val minExpectedMessages = expectedMessages - 0.05 * expectedMessages
190+
val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages
191191
val numMessages = recordedData.size
192192
assert(
193193
numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
194194
s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
195195
)
196196

197-
val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3
198-
val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1
197+
// XXX Checking every block would require an even distribution of messages across blocks,
198+
// which throttling code does not control. Therefore, test against the average.
199+
val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock
200+
val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock
199201
val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")
202+
203+
// the first and last block may be incomplete, so we slice them out
204+
val validBlocks = recordedBlocks.drop(1).dropRight(1)
205+
val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size
206+
200207
assert(
201-
// the first and last block may be incomplete, so we slice them out
202-
recordedBlocks.drop(1).dropRight(1).forall { block =>
203-
block.size >= minExpectedMessagesPerBlock && block.size <= maxExpectedMessagesPerBlock
204-
},
208+
averageBlockSize >= minExpectedMessagesPerBlock &&
209+
averageBlockSize <= maxExpectedMessagesPerBlock,
205210
s"# records in received blocks = [$receivedBlockSizes], not between " +
206-
s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock"
211+
s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average"
207212
)
208213
}
209214

0 commit comments

Comments
 (0)