File tree Expand file tree Collapse file tree 1 file changed +9
-5
lines changed
streaming/src/test/scala/org/apache/spark/streaming Expand file tree Collapse file tree 1 file changed +9
-5
lines changed Original file line number Diff line number Diff line change @@ -194,16 +194,20 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
194194 s " #records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
195195 )
196196
197+ // XXX Checking every block would require an even distribution of messages across blocks,
198+ // which throttling code does not control. Therefore, test against the average.
197199 val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock
198200 val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock
199201 val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(" ," )
202+
203+ // the first and last block may be incomplete, so we slice them out
204+ val validBlocks = recordedBlocks.drop(1 ).dropRight(1 )
205+ val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size
206+
200207 assert(
201- // the first and last block may be incomplete, so we slice them out
202- recordedBlocks.drop(1 ).dropRight(1 ).forall { block =>
203- block.size >= minExpectedMessagesPerBlock && block.size <= maxExpectedMessagesPerBlock
204- },
208+ averageBlockSize >= minExpectedMessagesPerBlock && averageBlockSize <= maxExpectedMessagesPerBlock,
205209 s " # records in received blocks = [ $receivedBlockSizes], not between " +
206- s " $minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock"
210+ s " $minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average "
207211 )
208212 }
209213
You can’t perform that action at this time.
0 commit comments