Skip to content

Commit 3eaed87

Browse files
Dibyendu Bhattacharyatdas
authored andcommitted
[SPARK-8080] [STREAMING] Receiver.store with Iterator does not give correct count at Spark UI
tdas zsxwing this is the new PR for Spark-8080 I have merged #6659 Also to mention , for MEMORY_ONLY settings , when Block is not able to unrollSafely to memory if enough space is not there, BlockManager won't try to put the block and ReceivedBlockHandler will throw SparkException as it could not find the block id in PutResult. Thus number of records in block won't be counted if Block failed to unroll in memory. Which is fine. For MEMORY_DISK settings , if BlockManager not able to unroll block to memory, block will still get deseralized to Disk. Same for WAL based store. So for those cases ( storage level = memory + disk ) number of records will be counted even though the block not able to unroll to memory. thus I added the isFullyConsumed in the CountingIterator but have not used it as such case will never happen that block not fully consumed and ReceivedBlockHandler still get the block ID. I have added few test cases to cover those block unrolling scenarios also. Author: Dibyendu Bhattacharya <dibyendu.bhattacharya1@pearson.com> Author: U-PEROOT\UBHATD1 <UBHATD1@PIN-L-PI046.PEROOT.com> Closes #6707 from dibbhatt/master and squashes the following commits: f6cb6b5 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI f37cfd8 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI 5a8344a [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI Count ByteBufferBlock as 1 count fceac72 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI 0153e7e [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI Fixed comments given by @zsxwing 4c5931d [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI 01e6dc8 [U-PEROOT\UBHATD1] A
1 parent 4ce3bab commit 3eaed87

File tree

4 files changed

+194
-22
lines changed

4 files changed

+194
-22
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
3232

3333
/** Trait that represents the metadata related to storage of blocks */
3434
private[streaming] trait ReceivedBlockStoreResult {
35-
def blockId: StreamBlockId // Any implementation of this trait will store a block id
35+
// Any implementation of this trait will store a block id
36+
def blockId: StreamBlockId
37+
// Any implementation of this trait will have to return the number of records
38+
def numRecords: Option[Long]
3639
}
3740

3841
/** Trait that represents a class that handles the storage of blocks received by receiver */
@@ -51,7 +54,8 @@ private[streaming] trait ReceivedBlockHandler {
5154
* that stores the metadata related to storage of blocks using
5255
* [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]]
5356
*/
54-
private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId)
57+
private[streaming] case class BlockManagerBasedStoreResult(
58+
blockId: StreamBlockId, numRecords: Option[Long])
5559
extends ReceivedBlockStoreResult
5660

5761

@@ -64,11 +68,20 @@ private[streaming] class BlockManagerBasedBlockHandler(
6468
extends ReceivedBlockHandler with Logging {
6569

6670
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
71+
72+
var numRecords = None: Option[Long]
73+
6774
val putResult: Seq[(BlockId, BlockStatus)] = block match {
6875
case ArrayBufferBlock(arrayBuffer) =>
69-
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
76+
numRecords = Some(arrayBuffer.size.toLong)
77+
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
78+
tellMaster = true)
7079
case IteratorBlock(iterator) =>
71-
blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
80+
val countIterator = new CountingIterator(iterator)
81+
val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
82+
tellMaster = true)
83+
numRecords = countIterator.count
84+
putResult
7285
case ByteBufferBlock(byteBuffer) =>
7386
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
7487
case o =>
@@ -79,7 +92,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
7992
throw new SparkException(
8093
s"Could not store $blockId to block manager with storage level $storageLevel")
8194
}
82-
BlockManagerBasedStoreResult(blockId)
95+
BlockManagerBasedStoreResult(blockId, numRecords)
8396
}
8497

8598
def cleanupOldBlocks(threshTime: Long) {
@@ -96,6 +109,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
96109
*/
97110
private[streaming] case class WriteAheadLogBasedStoreResult(
98111
blockId: StreamBlockId,
112+
numRecords: Option[Long],
99113
walRecordHandle: WriteAheadLogRecordHandle
100114
) extends ReceivedBlockStoreResult
101115

@@ -151,12 +165,17 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
151165
*/
152166
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
153167

168+
var numRecords = None: Option[Long]
154169
// Serialize the block so that it can be inserted into both
155170
val serializedBlock = block match {
156171
case ArrayBufferBlock(arrayBuffer) =>
172+
numRecords = Some(arrayBuffer.size.toLong)
157173
blockManager.dataSerialize(blockId, arrayBuffer.iterator)
158174
case IteratorBlock(iterator) =>
159-
blockManager.dataSerialize(blockId, iterator)
175+
val countIterator = new CountingIterator(iterator)
176+
val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
177+
numRecords = countIterator.count
178+
serializedBlock
160179
case ByteBufferBlock(byteBuffer) =>
161180
byteBuffer
162181
case _ =>
@@ -181,7 +200,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
181200
// Combine the futures, wait for both to complete, and return the write ahead log record handle
182201
val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
183202
val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
184-
WriteAheadLogBasedStoreResult(blockId, walRecordHandle)
203+
WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle)
185204
}
186205

187206
def cleanupOldBlocks(threshTime: Long) {
@@ -199,3 +218,23 @@ private[streaming] object WriteAheadLogBasedBlockHandler {
199218
new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString
200219
}
201220
}
221+
222+
/**
223+
* A utility that will wrap the Iterator to get the count
224+
*/
225+
private class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] {
226+
private var _count = 0
227+
228+
private def isFullyConsumed: Boolean = !iterator.hasNext
229+
230+
def hasNext(): Boolean = iterator.hasNext
231+
232+
def count(): Option[Long] = {
233+
if (isFullyConsumed) Some(_count) else None
234+
}
235+
236+
def next(): T = {
237+
_count += 1
238+
iterator.next()
239+
}
240+
}

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,15 +137,10 @@ private[streaming] class ReceiverSupervisorImpl(
137137
blockIdOption: Option[StreamBlockId]
138138
) {
139139
val blockId = blockIdOption.getOrElse(nextBlockId)
140-
val numRecords = receivedBlock match {
141-
case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong)
142-
case _ => None
143-
}
144-
145140
val time = System.currentTimeMillis
146141
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
147142
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
148-
143+
val numRecords = blockStoreResult.numRecords
149144
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
150145
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
151146
logDebug(s"Reported block $blockId")

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala

Lines changed: 146 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,19 @@ class ReceivedBlockHandlerSuite
4949

5050
val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
5151
val hadoopConf = new Configuration()
52-
val storageLevel = StorageLevel.MEMORY_ONLY_SER
5352
val streamId = 1
5453
val securityMgr = new SecurityManager(conf)
5554
val mapOutputTracker = new MapOutputTrackerMaster(conf)
5655
val shuffleManager = new HashShuffleManager(conf)
5756
val serializer = new KryoSerializer(conf)
5857
val manualClock = new ManualClock
5958
val blockManagerSize = 10000000
59+
val blockManagerBuffer = new ArrayBuffer[BlockManager]()
6060

6161
var rpcEnv: RpcEnv = null
6262
var blockManagerMaster: BlockManagerMaster = null
6363
var blockManager: BlockManager = null
64+
var storageLevel: StorageLevel = null
6465
var tempDirectory: File = null
6566

6667
before {
@@ -70,20 +71,21 @@ class ReceivedBlockHandlerSuite
7071
blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
7172
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
7273

73-
blockManager = new BlockManager("bm", rpcEnv, blockManagerMaster, serializer,
74-
blockManagerSize, conf, mapOutputTracker, shuffleManager,
75-
new NioBlockTransferService(conf, securityMgr), securityMgr, 0)
76-
blockManager.initialize("app-id")
74+
storageLevel = StorageLevel.MEMORY_ONLY_SER
75+
blockManager = createBlockManager(blockManagerSize, conf)
7776

7877
tempDirectory = Utils.createTempDir()
7978
manualClock.setTime(0)
8079
}
8180

8281
after {
83-
if (blockManager != null) {
84-
blockManager.stop()
85-
blockManager = null
82+
for ( blockManager <- blockManagerBuffer ) {
83+
if (blockManager != null) {
84+
blockManager.stop()
85+
}
8686
}
87+
blockManager = null
88+
blockManagerBuffer.clear()
8789
if (blockManagerMaster != null) {
8890
blockManagerMaster.stop()
8991
blockManagerMaster = null
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
174176
}
175177
}
176178

179+
test("Test Block - count messages") {
180+
// Test count with BlockManagedBasedBlockHandler
181+
testCountWithBlockManagerBasedBlockHandler(true)
182+
// Test count with WriteAheadLogBasedBlockHandler
183+
testCountWithBlockManagerBasedBlockHandler(false)
184+
}
185+
186+
test("Test Block - isFullyConsumed") {
187+
val sparkConf = new SparkConf()
188+
sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
189+
// spark.storage.unrollFraction set to 0.4 for BlockManager
190+
sparkConf.set("spark.storage.unrollFraction", "0.4")
191+
// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
192+
blockManager = createBlockManager(12000, sparkConf)
193+
194+
// there is not enough space to store this block in MEMORY,
195+
// But BlockManager will be able to sereliaze this block to WAL
196+
// and hence count returns correct value.
197+
testRecordcount(false, StorageLevel.MEMORY_ONLY,
198+
IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
199+
200+
// there is not enough space to store this block in MEMORY,
201+
// But BlockManager will be able to sereliaze this block to DISK
202+
// and hence count returns correct value.
203+
testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
204+
IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
205+
206+
// there is not enough space to store this block With MEMORY_ONLY StorageLevel.
207+
// BlockManager will not be able to unroll this block
208+
// and hence it will not tryToPut this block, resulting the SparkException
209+
storageLevel = StorageLevel.MEMORY_ONLY
210+
withBlockManagerBasedBlockHandler { handler =>
211+
val thrown = intercept[SparkException] {
212+
storeSingleBlock(handler, IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator))
213+
}
214+
}
215+
}
216+
217+
private def testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: Boolean) {
218+
// ByteBufferBlock-MEMORY_ONLY
219+
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
220+
ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None)
221+
// ByteBufferBlock-MEMORY_ONLY_SER
222+
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
223+
ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None)
224+
// ArrayBufferBlock-MEMORY_ONLY
225+
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
226+
ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
227+
// ArrayBufferBlock-MEMORY_ONLY_SER
228+
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
229+
ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
230+
// ArrayBufferBlock-DISK_ONLY
231+
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY,
232+
ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
233+
// ArrayBufferBlock-MEMORY_AND_DISK
234+
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_AND_DISK,
235+
ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
236+
// IteratorBlock-MEMORY_ONLY
237+
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
238+
IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100))
239+
// IteratorBlock-MEMORY_ONLY_SER
240+
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
241+
IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100))
242+
// IteratorBlock-DISK_ONLY
243+
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY,
244+
IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, Some(125))
245+
// IteratorBlock-MEMORY_AND_DISK
246+
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_AND_DISK,
247+
IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, Some(150))
248+
}
249+
250+
private def createBlockManager(
251+
maxMem: Long,
252+
conf: SparkConf,
253+
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
254+
val transfer = new NioBlockTransferService(conf, securityMgr)
255+
val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf,
256+
mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
257+
manager.initialize("app-id")
258+
blockManagerBuffer += manager
259+
manager
260+
}
261+
262+
/**
263+
* Test storing of data using different types of Handler, StorageLevle and ReceivedBlocks
264+
* and verify the correct record count
265+
*/
266+
private def testRecordcount(isBlockManagedBasedBlockHandler: Boolean,
267+
sLevel: StorageLevel,
268+
receivedBlock: ReceivedBlock,
269+
bManager: BlockManager,
270+
expectedNumRecords: Option[Long]
271+
) {
272+
blockManager = bManager
273+
storageLevel = sLevel
274+
var bId: StreamBlockId = null
275+
try {
276+
if (isBlockManagedBasedBlockHandler) {
277+
// test received block with BlockManager based handler
278+
withBlockManagerBasedBlockHandler { handler =>
279+
val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock)
280+
bId = blockId
281+
assert(blockStoreResult.numRecords === expectedNumRecords,
282+
"Message count not matches for a " +
283+
receivedBlock.getClass.getName +
284+
" being inserted using BlockManagerBasedBlockHandler with " + sLevel)
285+
}
286+
} else {
287+
// test received block with WAL based handler
288+
withWriteAheadLogBasedBlockHandler { handler =>
289+
val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock)
290+
bId = blockId
291+
assert(blockStoreResult.numRecords === expectedNumRecords,
292+
"Message count not matches for a " +
293+
receivedBlock.getClass.getName +
294+
" being inserted using WriteAheadLogBasedBlockHandler with " + sLevel)
295+
}
296+
}
297+
} finally {
298+
// Removing the Block Id to use same blockManager for next test
299+
blockManager.removeBlock(bId, true)
300+
}
301+
}
302+
177303
/**
178304
* Test storing of data using different forms of ReceivedBlocks and verify that they succeeded
179305
* using the given verification function
@@ -251,9 +377,21 @@ class ReceivedBlockHandlerSuite
251377
(blockIds, storeResults)
252378
}
253379

380+
/** Store single block using a handler */
381+
private def storeSingleBlock(
382+
handler: ReceivedBlockHandler,
383+
block: ReceivedBlock
384+
): (StreamBlockId, ReceivedBlockStoreResult) = {
385+
val blockId = generateBlockId
386+
val blockStoreResult = handler.storeBlock(blockId, block)
387+
logDebug("Done inserting")
388+
(blockId, blockStoreResult)
389+
}
390+
254391
private def getWriteAheadLogFiles(): Seq[String] = {
255392
getLogFilesInDirectory(checkpointDirToLogDir(tempDirectory.toString, streamId))
256393
}
257394

258395
private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong)
259396
}
397+

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ class ReceivedBlockTrackerSuite
225225
/** Generate blocks infos using random ids */
226226
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
227227
List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
228-
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)))))
228+
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
229229
}
230230

231231
/** Get all the data written in the given write ahead log file. */

0 commit comments

Comments
 (0)