Skip to content

Commit 0153e7e

Browse files
author
Dibyendu Bhattacharya
committed
[SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI
Fixed comments given by @zsxwing
1 parent 4c5931d commit 0153e7e

File tree

2 files changed

+21
-25
lines changed

2 files changed

+21
-25
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,20 @@ private[streaming] class BlockManagerBasedBlockHandler(
6969

7070
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
7171
var numRecords = None: Option[Long]
72-
val countIterator = block match {
73-
case ArrayBufferBlock(arrayBuffer) => new CountingIterator(arrayBuffer.iterator)
74-
case IteratorBlock(iterator) => new CountingIterator(iterator)
75-
case _ => null
76-
}
72+
7773
val putResult: Seq[(BlockId, BlockStatus)] = block match {
7874
case ArrayBufferBlock(arrayBuffer) =>
79-
blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true)
75+
val countIterator = new CountingIterator(arrayBuffer.iterator)
76+
val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
77+
tellMaster = true)
78+
numRecords = Some(countIterator.count)
79+
putResult
8080
case IteratorBlock(iterator) =>
81-
blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true)
81+
val countIterator = new CountingIterator(iterator)
82+
val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
83+
tellMaster = true)
84+
numRecords = Some(countIterator.count)
85+
putResult
8286
case ByteBufferBlock(byteBuffer) =>
8387
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
8488
case o =>
@@ -89,9 +93,6 @@ private[streaming] class BlockManagerBasedBlockHandler(
8993
throw new SparkException(
9094
s"Could not store $blockId to block manager with storage level $storageLevel")
9195
}
92-
if(countIterator != null) {
93-
numRecords = Some(countIterator.count)
94-
}
9596
BlockManagerBasedStoreResult(blockId, numRecords)
9697
}
9798

@@ -166,17 +167,18 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
166167
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
167168

168169
var numRecords = None: Option[Long]
169-
val countIterator = block match {
170-
case ArrayBufferBlock(arrayBuffer) => new CountingIterator(arrayBuffer.iterator)
171-
case IteratorBlock(iterator) => new CountingIterator(iterator)
172-
case _ => null
173-
}
174170
// Serialize the block so that it can be inserted into both
175171
val serializedBlock = block match {
176172
case ArrayBufferBlock(arrayBuffer) =>
177-
blockManager.dataSerialize(blockId, countIterator)
173+
val countIterator = new CountingIterator(arrayBuffer.iterator)
174+
val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
175+
numRecords = Some(countIterator.count)
176+
serializedBlock
178177
case IteratorBlock(iterator) =>
179-
blockManager.dataSerialize(blockId, countIterator)
178+
val countIterator = new CountingIterator(iterator)
179+
val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
180+
numRecords = Some(countIterator.count)
181+
serializedBlock
180182
case ByteBufferBlock(byteBuffer) =>
181183
byteBuffer
182184
case _ =>
@@ -201,9 +203,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
201203
// Combine the futures, wait for both to complete, and return the write ahead log record handle
202204
val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
203205
val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
204-
if(countIterator != null) {
205-
numRecords = Some(countIterator.count)
206-
}
207206
WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle)
208207
}
209208

@@ -226,7 +225,7 @@ private[streaming] object WriteAheadLogBasedBlockHandler {
226225
/**
227226
* A utility that will wrap the Iterator to get the count
228227
*/
229-
private class CountingIterator[T: Manifest](iterator: Iterator[T]) extends Iterator[T] {
228+
private class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] {
230229
var count = 0
231230
def hasNext(): Boolean = iterator.hasNext
232231
def isFullyConsumed: Boolean = !iterator.hasNext

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ class ReceivedBlockHandlerSuite
6060
var rpcEnv: RpcEnv = null
6161
var blockManagerMaster: BlockManagerMaster = null
6262
var blockManager: BlockManager = null
63-
var handler: ReceivedBlockHandler = null
6463
var tempDirectory: File = null
6564
var storageLevel = StorageLevel.MEMORY_ONLY_SER
6665

@@ -190,9 +189,7 @@ class ReceivedBlockHandlerSuite
190189
test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count messages") {
191190
storageLevel = StorageLevel.MEMORY_ONLY
192191
// Create a non-trivial (not all zeros) byte array
193-
var counter = 0.toByte
194-
def incr: Byte = {counter = (counter + 1).toByte; counter;}
195-
val bytes = Array.fill[Byte](100)(incr)
192+
val bytes = Array.tabulate(100)(i => i.toByte)
196193
val byteBufferBlock = ByteBuffer.wrap(bytes)
197194
withBlockManagerBasedBlockHandler { handler =>
198195
val blockStoreResult = storeBlock(handler, ByteBufferBlock(byteBufferBlock))

0 commit comments

Comments
 (0)