Skip to content

Commit f37cfd8

Browse files
author
Dibyendu Bhattacharya
committed
[SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI
1 parent 5a8344a commit f37cfd8

File tree

2 files changed

+109
-170
lines changed

2 files changed

+109
-170
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ private[streaming] trait ReceivedBlockHandler {
5454
* that stores the metadata related to storage of blocks using
5555
* [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]]
5656
*/
57-
private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId,
58-
numRecords: Option[Long])
57+
private[streaming] case class BlockManagerBasedStoreResult(
58+
blockId: StreamBlockId, numRecords: Option[Long])
5959
extends ReceivedBlockStoreResult
6060

6161

@@ -68,23 +68,21 @@ private[streaming] class BlockManagerBasedBlockHandler(
6868
extends ReceivedBlockHandler with Logging {
6969

7070
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
71+
7172
var numRecords = None: Option[Long]
7273

7374
val putResult: Seq[(BlockId, BlockStatus)] = block match {
7475
case ArrayBufferBlock(arrayBuffer) =>
75-
val countIterator = new CountingIterator(arrayBuffer.iterator)
76-
val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
76+
numRecords = Some(arrayBuffer.size.toLong)
77+
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
7778
tellMaster = true)
78-
numRecords = Some(countIterator.count)
79-
putResult
8079
case IteratorBlock(iterator) =>
8180
val countIterator = new CountingIterator(iterator)
8281
val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
8382
tellMaster = true)
84-
numRecords = Some(countIterator.count)
83+
numRecords = countIterator.count
8584
putResult
8685
case ByteBufferBlock(byteBuffer) =>
87-
numRecords = Some(1)
8886
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
8987
case o =>
9088
throw new SparkException(
@@ -171,17 +169,14 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
171169
// Serialize the block so that it can be inserted into both
172170
val serializedBlock = block match {
173171
case ArrayBufferBlock(arrayBuffer) =>
174-
val countIterator = new CountingIterator(arrayBuffer.iterator)
175-
val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
176-
numRecords = Some(countIterator.count)
177-
serializedBlock
172+
numRecords = Some(arrayBuffer.size.toLong)
173+
blockManager.dataSerialize(blockId, arrayBuffer.iterator)
178174
case IteratorBlock(iterator) =>
179175
val countIterator = new CountingIterator(iterator)
180176
val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
181-
numRecords = Some(countIterator.count)
177+
numRecords = countIterator.count
182178
serializedBlock
183179
case ByteBufferBlock(byteBuffer) =>
184-
numRecords = Some(1)
185180
byteBuffer
186181
case _ =>
187182
throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
@@ -228,11 +223,18 @@ private[streaming] object WriteAheadLogBasedBlockHandler {
228223
* A utility that will wrap the Iterator to get the count
229224
*/
230225
private class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] {
231-
var count = 0
226+
private var _count = 0
227+
228+
private def isFullyConsumed: Boolean = !iterator.hasNext
229+
232230
def hasNext(): Boolean = iterator.hasNext
233-
def isFullyConsumed: Boolean = !iterator.hasNext
231+
232+
def count(): Option[Long] = {
233+
if (isFullyConsumed) Some(_count) else None
234+
}
235+
234236
def next(): T = {
235-
count += 1
237+
_count += 1
236238
iterator.next()
237239
}
238240
}

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala

Lines changed: 90 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -186,146 +186,62 @@ class ReceivedBlockHandlerSuite
186186
}
187187
}
188188

189-
test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count messages") {
190-
storageLevel = StorageLevel.MEMORY_ONLY
191-
// Create a non-trivial (not all zeros) byte array
192-
val bytes = Array.tabulate(100)(i => i.toByte)
193-
val byteBufferBlock = ByteBuffer.wrap(bytes)
194-
withBlockManagerBasedBlockHandler { handler =>
195-
val blockStoreResult = storeBlock(handler, ByteBufferBlock(byteBufferBlock))
196-
// ByteBufferBlock is counted as single record
197-
assert(blockStoreResult.numRecords === Some(1))
198-
}
199-
}
200-
201-
test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count messages") {
202-
storageLevel = StorageLevel.MEMORY_ONLY
203-
// Create a non-trivial (not all zeros) byte array
204-
val bytes = Array.tabulate(100)(i => i.toByte)
205-
val byteBufferBlock = ByteBuffer.wrap(bytes)
206-
withWriteAheadLogBasedBlockHandler { handler =>
207-
val blockStoreResult = storeBlock(handler, ByteBufferBlock(byteBufferBlock))
208-
// ByteBufferBlock is counted as single record
209-
assert(blockStoreResult.numRecords === Some(1))
210-
}
211-
}
212-
213-
test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - count messages") {
214-
storageLevel = StorageLevel.MEMORY_ONLY
215-
val block = ArrayBuffer.fill(100)(0)
216-
withBlockManagerBasedBlockHandler { handler =>
217-
val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
218-
assert(blockStoreResult.numRecords === Some(100))
219-
}
220-
}
221-
222-
test("BlockManagerBasedBlockHandler-DISK_ONLY-ArrayBufferBlock - count messages") {
223-
storageLevel = StorageLevel.DISK_ONLY
224-
val block = ArrayBuffer.fill(100)(0)
225-
withBlockManagerBasedBlockHandler { handler =>
226-
val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
227-
assert(blockStoreResult.numRecords === Some(100))
228-
}
229-
}
230-
231-
test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK-ArrayBufferBlock - count messages") {
232-
storageLevel = StorageLevel.MEMORY_AND_DISK
233-
val block = ArrayBuffer.fill(100)(0)
234-
withBlockManagerBasedBlockHandler { handler =>
235-
val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
236-
assert(blockStoreResult.numRecords === Some(100))
237-
}
238-
}
239-
240-
test("BlockManagerBasedBlockHandler-MEMORY_ONLY-IteratorBlock - count messages") {
241-
storageLevel = StorageLevel.MEMORY_ONLY
242-
val block = ArrayBuffer.fill(100)(0)
243-
withBlockManagerBasedBlockHandler { handler =>
244-
val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator))
245-
assert(blockStoreResult.numRecords === Some(100))
246-
}
247-
}
248-
249-
test("BlockManagerBasedBlockHandler-DISK_ONLY-IteratorBlock - count messages") {
250-
storageLevel = StorageLevel.DISK_ONLY
251-
val block = ArrayBuffer.fill(100)(0)
252-
withBlockManagerBasedBlockHandler { handler =>
253-
val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator))
254-
assert(blockStoreResult.numRecords === Some(100))
255-
}
256-
}
257-
258-
test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK-IteratorBlock - count messages") {
259-
storageLevel = StorageLevel.MEMORY_AND_DISK
260-
val block = ArrayBuffer.fill(100)(0)
261-
withBlockManagerBasedBlockHandler { handler =>
262-
val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator))
263-
assert(blockStoreResult.numRecords === Some(100))
264-
}
265-
}
266-
267-
test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - count messages") {
268-
storageLevel = StorageLevel.MEMORY_ONLY
269-
val block = ArrayBuffer.fill(100)(0)
270-
withWriteAheadLogBasedBlockHandler { handler =>
271-
val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
272-
assert(blockStoreResult.numRecords === Some(100))
273-
}
274-
}
275-
276-
test("WriteAheadLogBasedBlockHandler-DISK_ONLY-ArrayBufferBlock - count messages") {
277-
storageLevel = StorageLevel.DISK_ONLY
278-
val block = ArrayBuffer.fill(100)(0)
279-
withWriteAheadLogBasedBlockHandler { handler =>
280-
val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
281-
assert(blockStoreResult.numRecords === Some(100))
282-
}
283-
}
284-
285-
test("WriteAheadLogBasedBlockHandler-MEMORY_AND_DISK-ArrayBufferBlock - count messages") {
286-
storageLevel = StorageLevel.MEMORY_AND_DISK
287-
val block = ArrayBuffer.fill(100)(0)
288-
withWriteAheadLogBasedBlockHandler { handler =>
289-
val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block))
290-
assert(blockStoreResult.numRecords === Some(100))
291-
}
292-
}
293-
294-
test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-IteratorBlock - count messages") {
295-
storageLevel = StorageLevel.MEMORY_ONLY
296-
val block = ArrayBuffer.fill(100)(0)
297-
withWriteAheadLogBasedBlockHandler { handler =>
298-
val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator))
299-
assert(blockStoreResult.numRecords === Some(100))
300-
}
301-
}
302-
303-
test("WriteAheadLogBasedBlockHandler-DISK_ONLY-IteratorBlock - count messages ") {
304-
storageLevel = StorageLevel.DISK_ONLY
305-
val block = ArrayBuffer.fill(100)(0)
306-
withWriteAheadLogBasedBlockHandler { handler =>
307-
val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator))
308-
assert(blockStoreResult.numRecords === Some(100))
309-
}
310-
}
311-
312-
test("WriteAheadLogBasedBlockHandler-MEMORY_AND_DISK-IteratorBlock - count messages") {
313-
storageLevel = StorageLevel.MEMORY_AND_DISK
314-
val block = ArrayBuffer.fill(100)(0)
315-
withWriteAheadLogBasedBlockHandler { handler =>
316-
val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator))
317-
assert(blockStoreResult.numRecords === Some(100))
318-
}
319-
}
320-
321-
test("BlockManagerBasedBlockHandler - isFullyConsumed-MEMORY_ONLY") {
189+
test("BlockManagerBasedBlockHandler - count messages") {
190+
// ByteBufferBlock-MEMORY_ONLY
191+
testRecordcount(true, StorageLevel.MEMORY_ONLY,
192+
ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None)
193+
// ArrayBufferBlock-MEMORY_ONLY
194+
testRecordcount(true, StorageLevel.MEMORY_ONLY,
195+
ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
196+
// ArrayBufferBlock-DISK_ONLY
197+
testRecordcount(true, StorageLevel.DISK_ONLY,
198+
ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
199+
// ArrayBufferBlock-MEMORY_AND_DISK
200+
testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
201+
ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
202+
// IteratorBlock-MEMORY_ONLY
203+
testRecordcount(true, StorageLevel.MEMORY_ONLY,
204+
IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100))
205+
// IteratorBlock-DISK_ONLY
206+
testRecordcount(true, StorageLevel.DISK_ONLY,
207+
IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, Some(125))
208+
// IteratorBlock-MEMORY_AND_DISK
209+
testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
210+
IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, Some(150))
211+
}
212+
213+
test("WriteAheadLogBasedBlockHandler - count messages") {
214+
// ByteBufferBlock-MEMORY_ONLY
215+
testRecordcount(false, StorageLevel.MEMORY_ONLY,
216+
ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None)
217+
// ArrayBufferBlock-MEMORY_ONLY
218+
testRecordcount(false, StorageLevel.MEMORY_ONLY,
219+
ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
220+
// ArrayBufferBlock-DISK_ONLY
221+
testRecordcount(false, StorageLevel.DISK_ONLY,
222+
ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
223+
// ArrayBufferBlock-MEMORY_AND_DISK
224+
testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
225+
ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
226+
// IteratorBlock-MEMORY_ONLY
227+
testRecordcount(false, StorageLevel.MEMORY_ONLY,
228+
IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100))
229+
// IteratorBlock-DISK_ONLY
230+
testRecordcount(false, StorageLevel.DISK_ONLY,
231+
IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, Some(125))
232+
// IteratorBlock-MEMORY_AND_DISK
233+
testRecordcount(false, StorageLevel.MEMORY_AND_DISK,
234+
IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, Some(150))
235+
}
236+
237+
test("BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
322238
storageLevel = StorageLevel.MEMORY_ONLY
323239
blockManager = createBlockManager(12000)
324240
val block = List.fill(70)(new Array[Byte](100))
325241
// spark.storage.unrollFraction set to 0.4 for BlockManager
326-
// With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store
242+
// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is not enough space to store
327243
// this block With MEMORY_ONLY StorageLevel. BlockManager will not be able to unroll this block
328-
// and hence it will not tryToPut this block , resulting the SparkException
244+
// and hence it will not tryToPut this block, resulting the SparkException
329245
withBlockManagerBasedBlockHandler { handler =>
330246
val thrown = intercept[SparkException] {
331247
val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator))
@@ -335,34 +251,55 @@ class ReceivedBlockHandlerSuite
335251
}
336252
}
337253

338-
test("BlockManagerBasedBlockHandler - isFullyConsumed-MEMORY_AND_DISK") {
339-
storageLevel = StorageLevel.MEMORY_AND_DISK
254+
test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK - isFullyConsumed") {
340255
blockManager = createBlockManager(12000)
341-
val block = List.fill(70)(new Array[Byte](100))
342256
// spark.storage.unrollFraction set to 0.4 for BlockManager
343-
// With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store
344-
// this block in MEMORY , But BlockManager will be able to sereliaze this block to DISK
257+
// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is not enough space to store
258+
// this block in MEMORY, But BlockManager will be able to sereliaze this block to DISK
345259
// and hence count returns correct value.
346-
withBlockManagerBasedBlockHandler { handler =>
347-
val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator))
348-
assert(blockStoreResult.numRecords === Some(70))
349-
}
260+
testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
261+
IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
350262
}
351263

352-
test("WriteAheadLogBasedBlockHandler - isFullyConsumed-MEMORY_ONLY") {
353-
storageLevel = StorageLevel.MEMORY_ONLY
264+
test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") {
354265
blockManager = createBlockManager(12000)
355-
val block = List.fill(70)(new Array[Byte](100))
356266
// spark.storage.unrollFraction set to 0.4 for BlockManager
357-
// With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store
358-
// this block in MEMORY , But BlockManager will be able to sereliaze this block to WAL
267+
// With 12000 * 0.4 = 4800 bytes of free space for unroll, there is not enough space to store
268+
// this block in MEMORY, But BlockManager will be able to sereliaze this block to WAL
359269
// and hence count returns correct value.
360-
withWriteAheadLogBasedBlockHandler { handler =>
361-
val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator))
362-
assert(blockStoreResult.numRecords === Some(70))
363-
}
270+
testRecordcount(false, StorageLevel.MEMORY_ONLY,
271+
IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
364272
}
365273

274+
/**
275+
* Test storing of data using different types of Handler, StorageLevle and ReceivedBlocks
276+
* and verify the correct record count
277+
*/
278+
private def testRecordcount(isBlockManagedBasedBlockHandler: Boolean,
279+
sLevel: StorageLevel,
280+
receivedBlock: ReceivedBlock,
281+
bManager: BlockManager,
282+
expectedNumRecords: Option[Long]
283+
) {
284+
storageLevel = sLevel
285+
blockManager = bManager
286+
if (isBlockManagedBasedBlockHandler) {
287+
// test received block with BlockManager based handler
288+
withBlockManagerBasedBlockHandler { handler =>
289+
val blockStoreResult = storeBlock(handler, receivedBlock)
290+
assert(blockStoreResult.numRecords === expectedNumRecords)
291+
}
292+
} else {
293+
// test received block with WAL based handler
294+
withWriteAheadLogBasedBlockHandler { handler =>
295+
val blockStoreResult = storeBlock(handler, receivedBlock)
296+
assert(blockStoreResult.numRecords === expectedNumRecords)
297+
}
298+
}
299+
// Removing the Block Id to use same blockManager for next test
300+
blockManager.removeBlock(StreamBlockId(streamId, 1000L), true)
301+
}
302+
366303
/**
367304
* Test storing of data using different forms of ReceivedBlocks and verify that they succeeded
368305
* using the given verification function

0 commit comments

Comments
 (0)