Skip to content

Commit 210f495

Browse files
committed
Revert "Added a few tests that measure the receiver’s rate."
This reverts commit 0c51959.
1 parent 0c51959 commit 210f495

File tree

3 files changed

+75
-130
lines changed

3 files changed

+75
-130
lines changed

streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala

Lines changed: 74 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,63 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
155155
assert(recordedData.toSet === generatedData.toSet)
156156
}
157157

158+
ignore("block generator throttling") {
159+
val blockGeneratorListener = new FakeBlockGeneratorListener
160+
val blockIntervalMs = 100
161+
val maxRate = 1001
162+
val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms").
163+
set("spark.streaming.receiver.maxRate", maxRate.toString)
164+
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
165+
val expectedBlocks = 20
166+
val waitTime = expectedBlocks * blockIntervalMs
167+
val expectedMessages = maxRate * waitTime / 1000
168+
val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000
169+
val generatedData = new ArrayBuffer[Int]
170+
171+
// Generate blocks
172+
val startTime = System.currentTimeMillis()
173+
blockGenerator.start()
174+
var count = 0
175+
while(System.currentTimeMillis - startTime < waitTime) {
176+
blockGenerator.addData(count)
177+
generatedData += count
178+
count += 1
179+
}
180+
blockGenerator.stop()
181+
182+
val recordedBlocks = blockGeneratorListener.arrayBuffers
183+
val recordedData = recordedBlocks.flatten
184+
assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")
185+
assert(recordedData.toSet === generatedData.toSet, "Received data not same")
186+
187+
// recordedData size should be close to the expected rate; use an error margin proportional to
188+
// the value, so that rate changes don't cause a brittle test
189+
val minExpectedMessages = expectedMessages - 0.05 * expectedMessages
190+
val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages
191+
val numMessages = recordedData.size
192+
assert(
193+
numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
194+
s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
195+
)
196+
197+
// XXX Checking every block would require an even distribution of messages across blocks,
198+
// which throttling code does not control. Therefore, test against the average.
199+
val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock
200+
val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock
201+
val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")
202+
203+
// the first and last block may be incomplete, so we slice them out
204+
val validBlocks = recordedBlocks.drop(1).dropRight(1)
205+
val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size
206+
207+
assert(
208+
averageBlockSize >= minExpectedMessagesPerBlock &&
209+
averageBlockSize <= maxExpectedMessagesPerBlock,
210+
s"# records in received blocks = [$receivedBlockSizes], not between " +
211+
s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average"
212+
)
213+
}
214+
158215
/**
159216
* Test whether write ahead logs are generated by received,
160217
* and automatically cleaned up. The clean up must be aware of the
@@ -290,33 +347,28 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
290347
errors += throwable
291348
}
292349
}
293-
}
294350

295-
/**
296-
* An implementation of BlockGeneratorListener that is used to test the BlockGenerator.
297-
*/
298-
class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener {
299-
// buffer of data received as ArrayBuffers
300-
val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
301-
val errors = new ArrayBuffer[Throwable]
302-
303-
def onAddData(data: Any, metadata: Any) {}
351+
/**
352+
* An implementation of BlockGeneratorListener that is used to test the BlockGenerator.
353+
*/
354+
class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener {
355+
// buffer of data received as ArrayBuffers
356+
val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
357+
val errors = new ArrayBuffer[Throwable]
304358

305-
def onGenerateBlock(blockId: StreamBlockId) {}
359+
def onAddData(data: Any, metadata: Any) { }
306360

307-
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
308-
val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
309-
arrayBuffers += bufferOfInts
310-
Thread.sleep(0)
311-
}
361+
def onGenerateBlock(blockId: StreamBlockId) { }
312362

313-
def onError(message: String, throwable: Throwable) {
314-
errors += throwable
315-
}
363+
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
364+
val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
365+
arrayBuffers += bufferOfInts
366+
Thread.sleep(0)
367+
}
316368

317-
def reset(): Unit = {
318-
arrayBuffers.clear()
319-
errors.clear()
369+
def onError(message: String, throwable: Throwable) {
370+
errors += throwable
371+
}
320372
}
321373
}
322374

streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala

Lines changed: 0 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,8 @@
1717

1818
package org.apache.spark.streaming.receiver
1919

20-
import scala.collection.mutable.ArrayBuffer
21-
2220
import org.apache.spark.SparkConf
2321
import org.apache.spark.SparkFunSuite
24-
import org.apache.spark.streaming.StreamingContext
25-
import org.apache.spark.streaming.FakeBlockGeneratorListener
2622

2723
/** Testsuite for testing the network receiver behavior */
2824
class RateLimiterSuite extends SparkFunSuite {
@@ -47,108 +43,4 @@ class RateLimiterSuite extends SparkFunSuite {
4743
rateLimiter.updateRate(105)
4844
assert(rateLimiter.getCurrentLimit === 100)
4945
}
50-
51-
def setupGenerator(blockInterval: Int): (BlockGenerator, FakeBlockGeneratorListener) = {
52-
val blockGeneratorListener = new FakeBlockGeneratorListener
53-
val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockInterval}ms")
54-
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
55-
(blockGenerator, blockGeneratorListener)
56-
}
57-
58-
test("throttling block generator") {
59-
val blockIntervalMs = 100
60-
val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs)
61-
val maxRate = 1000
62-
blockGenerator.updateRate(maxRate)
63-
blockGenerator.start()
64-
throttlingTest(maxRate, blockGenerator, blockGeneratorListener, blockIntervalMs)
65-
blockGenerator.stop()
66-
}
67-
68-
test("throttling block generator changes rate up") {
69-
val blockIntervalMs = 100
70-
val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs)
71-
val maxRate1 = 1000
72-
blockGenerator.start()
73-
blockGenerator.updateRate(maxRate1)
74-
throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs)
75-
76-
blockGeneratorListener.reset()
77-
val maxRate2 = 5000
78-
blockGenerator.updateRate(maxRate2)
79-
throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs)
80-
blockGenerator.stop()
81-
}
82-
83-
test("throttling block generator changes rate up and down") {
84-
val blockIntervalMs = 100
85-
val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs)
86-
val maxRate1 = 1000
87-
blockGenerator.updateRate(maxRate1)
88-
blockGenerator.start()
89-
throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs)
90-
91-
blockGeneratorListener.reset()
92-
val maxRate2 = 5000
93-
blockGenerator.updateRate(maxRate2)
94-
throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs)
95-
96-
blockGeneratorListener.reset()
97-
val maxRate3 = 1000
98-
blockGenerator.updateRate(maxRate3)
99-
throttlingTest(maxRate3, blockGenerator, blockGeneratorListener, blockIntervalMs)
100-
blockGenerator.stop()
101-
}
102-
103-
def throttlingTest(
104-
maxRate: Long,
105-
blockGenerator: BlockGenerator,
106-
blockGeneratorListener: FakeBlockGeneratorListener,
107-
blockIntervalMs: Int) {
108-
val expectedBlocks = 20
109-
val waitTime = expectedBlocks * blockIntervalMs
110-
val expectedMessages = maxRate * waitTime / 1000
111-
val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000
112-
val generatedData = new ArrayBuffer[Int]
113-
114-
// Generate blocks
115-
val startTime = System.currentTimeMillis()
116-
var count = 0
117-
while(System.currentTimeMillis - startTime < waitTime) {
118-
blockGenerator.addData(count)
119-
generatedData += count
120-
count += 1
121-
}
122-
123-
val recordedBlocks = blockGeneratorListener.arrayBuffers
124-
val recordedData = recordedBlocks.flatten
125-
assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")
126-
127-
// recordedData size should be close to the expected rate; use an error margin proportional to
128-
// the value, so that rate changes don't cause a brittle test
129-
val minExpectedMessages = expectedMessages - 0.05 * expectedMessages
130-
val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages
131-
val numMessages = recordedData.size
132-
assert(
133-
numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
134-
s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
135-
)
136-
137-
// XXX Checking every block would require an even distribution of messages across blocks,
138-
// which throttling code does not control. Therefore, test against the average.
139-
val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock
140-
val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock
141-
val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")
142-
143-
// the first and last block may be incomplete, so we slice them out
144-
val validBlocks = recordedBlocks.drop(1).dropRight(1)
145-
val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size
146-
147-
assert(
148-
averageBlockSize >= minExpectedMessagesPerBlock &&
149-
averageBlockSize <= maxExpectedMessagesPerBlock,
150-
s"# records in received blocks = [$receivedBlockSizes], not between " +
151-
s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average"
152-
)
153-
}
15446
}

streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.storage.StorageLevel
2626
import org.apache.spark.streaming.receiver._
2727
import org.apache.spark.util.Utils
2828
import org.apache.spark.streaming.dstream.InputDStream
29+
import scala.reflect.ClassTag
2930
import org.apache.spark.streaming.dstream.ReceiverInputDStream
3031

3132
/** Testsuite for receiver scheduling */

0 commit comments

Comments
 (0)