-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI #6707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
01e6dc8
4c5931d
0153e7e
fceac72
5a8344a
f37cfd8
f6cb6b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,7 +32,10 @@ import org.apache.spark.{Logging, SparkConf, SparkException} | |
|
||
/** Trait that represents the metadata related to storage of blocks */ | ||
private[streaming] trait ReceivedBlockStoreResult { | ||
def blockId: StreamBlockId // Any implementation of this trait will store a block id | ||
// Any implementation of this trait will store a block id | ||
def blockId: StreamBlockId | ||
// Any implementation of this trait will have to return the number of records | ||
def numRecords: Option[Long] | ||
} | ||
|
||
/** Trait that represents a class that handles the storage of blocks received by receiver */ | ||
|
@@ -51,7 +54,8 @@ private[streaming] trait ReceivedBlockHandler { | |
* that stores the metadata related to storage of blocks using | ||
* [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]] | ||
*/ | ||
private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId) | ||
private[streaming] case class BlockManagerBasedStoreResult( | ||
blockId: StreamBlockId, numRecords: Option[Long]) | ||
extends ReceivedBlockStoreResult | ||
|
||
|
||
|
@@ -64,11 +68,20 @@ private[streaming] class BlockManagerBasedBlockHandler( | |
extends ReceivedBlockHandler with Logging { | ||
|
||
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { | ||
|
||
var numRecords = None: Option[Long] | ||
|
||
val putResult: Seq[(BlockId, BlockStatus)] = block match { | ||
case ArrayBufferBlock(arrayBuffer) => | ||
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true) | ||
numRecords = Some(arrayBuffer.size.toLong) | ||
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, | ||
tellMaster = true) | ||
case IteratorBlock(iterator) => | ||
blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true) | ||
val countIterator = new CountingIterator(iterator) | ||
val putResult = blockManager.putIterator(blockId, countIterator, storageLevel, | ||
tellMaster = true) | ||
numRecords = countIterator.count | ||
putResult | ||
case ByteBufferBlock(byteBuffer) => | ||
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) | ||
case o => | ||
|
@@ -79,7 +92,7 @@ private[streaming] class BlockManagerBasedBlockHandler( | |
throw new SparkException( | ||
s"Could not store $blockId to block manager with storage level $storageLevel") | ||
} | ||
BlockManagerBasedStoreResult(blockId) | ||
BlockManagerBasedStoreResult(blockId, numRecords) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But how we can count ByteBufferBlock ? if you count one block as 1 message, that is also wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, technically it is a single record - though I agree that is not exactly right either, but it must count as at least 1, correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The bytebuffer is expected to have multiple serialized records in it, so I think it is more wrong to count it as 1. So None is better than Some(1) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both are wrong but counting it as zero means we end up suggesting no data was received. I personally think they should be counted as 1, we can doc it |
||
} | ||
|
||
def cleanupOldBlocks(threshTime: Long) { | ||
|
@@ -96,6 +109,7 @@ private[streaming] class BlockManagerBasedBlockHandler( | |
*/ | ||
private[streaming] case class WriteAheadLogBasedStoreResult( | ||
blockId: StreamBlockId, | ||
numRecords: Option[Long], | ||
walRecordHandle: WriteAheadLogRecordHandle | ||
) extends ReceivedBlockStoreResult | ||
|
||
|
@@ -151,12 +165,17 @@ private[streaming] class WriteAheadLogBasedBlockHandler( | |
*/ | ||
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { | ||
|
||
var numRecords = None: Option[Long] | ||
// Serialize the block so that it can be inserted into both | ||
val serializedBlock = block match { | ||
case ArrayBufferBlock(arrayBuffer) => | ||
numRecords = Some(arrayBuffer.size.toLong) | ||
blockManager.dataSerialize(blockId, arrayBuffer.iterator) | ||
case IteratorBlock(iterator) => | ||
blockManager.dataSerialize(blockId, iterator) | ||
val countIterator = new CountingIterator(iterator) | ||
val serializedBlock = blockManager.dataSerialize(blockId, countIterator) | ||
numRecords = countIterator.count | ||
serializedBlock | ||
case ByteBufferBlock(byteBuffer) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same issue as before. How did we count |
||
byteBuffer | ||
case _ => | ||
|
@@ -181,7 +200,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( | |
// Combine the futures, wait for both to complete, and return the write ahead log record handle | ||
val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) | ||
val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout) | ||
WriteAheadLogBasedStoreResult(blockId, walRecordHandle) | ||
WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle) | ||
} | ||
|
||
def cleanupOldBlocks(threshTime: Long) { | ||
|
@@ -199,3 +218,23 @@ private[streaming] object WriteAheadLogBasedBlockHandler { | |
new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString | ||
} | ||
} | ||
|
||
/** | ||
* A utility that will wrap the Iterator to get the count | ||
*/ | ||
private class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] { | ||
private var _count = 0 | ||
|
||
private def isFullyConsumed: Boolean = !iterator.hasNext | ||
|
||
def hasNext(): Boolean = iterator.hasNext | ||
|
||
def count(): Option[Long] = { | ||
if (isFullyConsumed) Some(_count) else None | ||
} | ||
|
||
def next(): T = { | ||
_count += 1 | ||
iterator.next() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,18 +49,19 @@ class ReceivedBlockHandlerSuite | |
|
||
val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1") | ||
val hadoopConf = new Configuration() | ||
val storageLevel = StorageLevel.MEMORY_ONLY_SER | ||
val streamId = 1 | ||
val securityMgr = new SecurityManager(conf) | ||
val mapOutputTracker = new MapOutputTrackerMaster(conf) | ||
val shuffleManager = new HashShuffleManager(conf) | ||
val serializer = new KryoSerializer(conf) | ||
val manualClock = new ManualClock | ||
val blockManagerSize = 10000000 | ||
val blockManagerBuffer = new ArrayBuffer[BlockManager]() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like a huge anti-pattern to me! Unless you're testing the interaction of multiple block managers, creating a a buffer to manage cleanup seems to suggest bad test design to me. |
||
|
||
var rpcEnv: RpcEnv = null | ||
var blockManagerMaster: BlockManagerMaster = null | ||
var blockManager: BlockManager = null | ||
var storageLevel: StorageLevel = null | ||
var tempDirectory: File = null | ||
|
||
before { | ||
|
@@ -70,20 +71,21 @@ class ReceivedBlockHandlerSuite | |
blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", | ||
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true) | ||
|
||
blockManager = new BlockManager("bm", rpcEnv, blockManagerMaster, serializer, | ||
blockManagerSize, conf, mapOutputTracker, shuffleManager, | ||
new NioBlockTransferService(conf, securityMgr), securityMgr, 0) | ||
blockManager.initialize("app-id") | ||
storageLevel = StorageLevel.MEMORY_ONLY_SER | ||
blockManager = createBlockManager(blockManagerSize, conf) | ||
|
||
tempDirectory = Utils.createTempDir() | ||
manualClock.setTime(0) | ||
} | ||
|
||
after { | ||
if (blockManager != null) { | ||
blockManager.stop() | ||
blockManager = null | ||
for ( blockManager <- blockManagerBuffer ) { | ||
if (blockManager != null) { | ||
blockManager.stop() | ||
} | ||
} | ||
blockManager = null | ||
blockManagerBuffer.clear() | ||
if (blockManagerMaster != null) { | ||
blockManagerMaster.stop() | ||
blockManagerMaster = null | ||
|
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite | |
} | ||
} | ||
|
||
test("Test Block - count messages") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an example of a testing anti-pattern which I've seen a lot: this test is actually testing two completely independent cases, one with a BlockManager-based block handler and another with a WAL-based handler. As a result, this deserves to be two The fact that this test is four lines and half of the lines are comments explaining the two cases is another hint that this should be two separate tests, since then the name in |
||
// Test count with BlockManagedBasedBlockHandler | ||
testCountWithBlockManagerBasedBlockHandler(true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that this is a bad function name: if I'm only reading this line and am not jumping around in the IDE, how I am I supposed to know what this boolean parameter denotes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method of switching the handlers in tests is also bad because it doesn't generalize to handle a third type of handler. Instead, I think it would have been better to just pass in either |
||
// Test count with WriteAheadLogBasedBlockHandler | ||
testCountWithBlockManagerBasedBlockHandler(false) | ||
} | ||
|
||
test("Test Block - isFullyConsumed") { | ||
val sparkConf = new SparkConf() | ||
sparkConf.set("spark.storage.unrollMemoryThreshold", "512") | ||
// spark.storage.unrollFraction set to 0.4 for BlockManager | ||
sparkConf.set("spark.storage.unrollFraction", "0.4") | ||
// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll | ||
blockManager = createBlockManager(12000, sparkConf) | ||
|
||
// there is not enough space to store this block in MEMORY, | ||
// But BlockManager will be able to sereliaze this block to WAL | ||
// and hence count returns correct value. | ||
testRecordcount(false, StorageLevel.MEMORY_ONLY, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just ran into another bug related to test code from this PR! This part of the test should guarantee that the block actually won't fit, either by making the block size absolutely huge (e.g. 5 megabytes) or by pushing asserts into |
||
IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70)) | ||
|
||
// there is not enough space to store this block in MEMORY, | ||
// But BlockManager will be able to sereliaze this block to DISK | ||
// and hence count returns correct value. | ||
testRecordcount(true, StorageLevel.MEMORY_AND_DISK, | ||
IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70)) | ||
|
||
// there is not enough space to store this block With MEMORY_ONLY StorageLevel. | ||
// BlockManager will not be able to unroll this block | ||
// and hence it will not tryToPut this block, resulting the SparkException | ||
storageLevel = StorageLevel.MEMORY_ONLY | ||
withBlockManagerBasedBlockHandler { handler => | ||
val thrown = intercept[SparkException] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kind of weird to assign the exception to a variable and then not write any asserts over the exception or its message. |
||
storeSingleBlock(handler, IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator)) | ||
} | ||
} | ||
} | ||
|
||
private def testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: Boolean) { | ||
// ByteBufferBlock-MEMORY_ONLY | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that this is a messy / bad way of writing this test. If we want to test a whole bunch of cases, then I think it would make sense to use a for-loop to build up some test cases. I'd also omit the |
||
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY, | ||
ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None) | ||
// ByteBufferBlock-MEMORY_ONLY_SER | ||
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER, | ||
ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None) | ||
// ArrayBufferBlock-MEMORY_ONLY | ||
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY, | ||
ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25)) | ||
// ArrayBufferBlock-MEMORY_ONLY_SER | ||
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER, | ||
ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25)) | ||
// ArrayBufferBlock-DISK_ONLY | ||
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY, | ||
ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50)) | ||
// ArrayBufferBlock-MEMORY_AND_DISK | ||
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_AND_DISK, | ||
ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75)) | ||
// IteratorBlock-MEMORY_ONLY | ||
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY, | ||
IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100)) | ||
// IteratorBlock-MEMORY_ONLY_SER | ||
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER, | ||
IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100)) | ||
// IteratorBlock-DISK_ONLY | ||
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY, | ||
IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, Some(125)) | ||
// IteratorBlock-MEMORY_AND_DISK | ||
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_AND_DISK, | ||
IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, Some(150)) | ||
} | ||
|
||
private def createBlockManager( | ||
maxMem: Long, | ||
conf: SparkConf, | ||
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { | ||
val transfer = new NioBlockTransferService(conf, securityMgr) | ||
val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf, | ||
mapOutputTracker, shuffleManager, transfer, securityMgr, 0) | ||
manager.initialize("app-id") | ||
blockManagerBuffer += manager | ||
manager | ||
} | ||
|
||
/** | ||
* Test storing of data using different types of Handler, StorageLevle and ReceivedBlocks | ||
* and verify the correct record count | ||
*/ | ||
private def testRecordcount(isBlockManagedBasedBlockHandler: Boolean, | ||
sLevel: StorageLevel, | ||
receivedBlock: ReceivedBlock, | ||
bManager: BlockManager, | ||
expectedNumRecords: Option[Long] | ||
) { | ||
blockManager = bManager | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. incorrect indent, but taking care of it before merging. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why re-assign to block manager here? It looks like this is always called with the same block manager, so this is really confusing to me as a reader who's unfamiliar with this code. |
||
storageLevel = sLevel | ||
var bId: StreamBlockId = null | ||
try { | ||
if (isBlockManagedBasedBlockHandler) { | ||
// test received block with BlockManager based handler | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is obvious from the next line; no need to comment on it. |
||
withBlockManagerBasedBlockHandler { handler => | ||
val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. incorrect indent There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can take care of it |
||
bId = blockId | ||
assert(blockStoreResult.numRecords === expectedNumRecords, | ||
"Message count not matches for a " + | ||
receivedBlock.getClass.getName + | ||
" being inserted using BlockManagerBasedBlockHandler with " + sLevel) | ||
} | ||
} else { | ||
// test received block with WAL based handler | ||
withWriteAheadLogBasedBlockHandler { handler => | ||
val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock) | ||
bId = blockId | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. incorrect indent |
||
assert(blockStoreResult.numRecords === expectedNumRecords, | ||
"Message count not matches for a " + | ||
receivedBlock.getClass.getName + | ||
" being inserted using WriteAheadLogBasedBlockHandler with " + sLevel) | ||
} | ||
} | ||
} finally { | ||
// Removing the Block Id to use same blockManager for next test | ||
blockManager.removeBlock(bId, true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The cost of setup / teardown for the fixtures in this suite should be pretty low, so it seems unnecessarily complicated to try to re-use them like this vs. simply wrapping each case in |
||
} | ||
} | ||
|
||
/** | ||
* Test storing of data using different forms of ReceivedBlocks and verify that they succeeded | ||
* using the given verification function | ||
|
@@ -251,9 +377,21 @@ class ReceivedBlockHandlerSuite | |
(blockIds, storeResults) | ||
} | ||
|
||
/** Store single block using a handler */ | ||
private def storeSingleBlock( | ||
handler: ReceivedBlockHandler, | ||
block: ReceivedBlock | ||
): (StreamBlockId, ReceivedBlockStoreResult) = { | ||
val blockId = generateBlockId | ||
val blockStoreResult = handler.storeBlock(blockId, block) | ||
logDebug("Done inserting") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like a very low-usefulness log message. At the very least I would have included the |
||
(blockId, blockStoreResult) | ||
} | ||
|
||
private def getWriteAheadLogFiles(): Seq[String] = { | ||
getLogFilesInDirectory(checkpointDirToLogDir(tempDirectory.toString, streamId)) | ||
} | ||
|
||
private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -225,7 +225,7 @@ class ReceivedBlockTrackerSuite | |
/** Generate blocks infos using random ids */ | ||
def generateBlockInfos(): Seq[ReceivedBlockInfo] = { | ||
List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None, | ||
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt))))) | ||
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L)))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why change it to 0 from Random.nextInt? Let's not touch other testsuites without understanding the implications. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did not change the Random.nextInt. Just added 0 as BlockManagerBasedStoreResult now takes the recordCount. This testcase is for testing the recordcount 0 which @zsxwing did, which I changed after the merge. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not an issue |
||
} | ||
|
||
/** Get all the data written in the given write ahead log file. */ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a preference thing I guess, but this would sound better if it was called
recordCount
, no?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For all other place where count is recorded (refer to this PR https://github.com/apache/spark/pull/6659/files), it call as numRecords. Just wanted to keep this consistent naming across all classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, ok. I just find the
num*
method calls weird, when it could be called*count
. But if it is consistent with everything else, then it is fine.