Skip to content

[SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI #6707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ import org.apache.spark.{Logging, SparkConf, SparkException}

/** Trait that represents the metadata related to storage of blocks */
private[streaming] trait ReceivedBlockStoreResult {
def blockId: StreamBlockId // Any implementation of this trait will store a block id
// Any implementation of this trait will store a block id
def blockId: StreamBlockId
// Any implementation of this trait will have to return the number of records
def numRecords: Option[Long]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a preference thing I guess, but this would sound better if it was called recordCount, no?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all other place where count is recorded (refer to this PR https://github.com/apache/spark/pull/6659/files), it call as numRecords. Just wanted to keep this consistent naming across all classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. I just find the num* method calls weird, when it could be called *count. But if it is consistent with everything else, then it is fine.

}

/** Trait that represents a class that handles the storage of blocks received by receiver */
Expand All @@ -51,7 +54,8 @@ private[streaming] trait ReceivedBlockHandler {
* that stores the metadata related to storage of blocks using
* [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]]
*/
private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId)
private[streaming] case class BlockManagerBasedStoreResult(
blockId: StreamBlockId, numRecords: Option[Long])
extends ReceivedBlockStoreResult


Expand All @@ -64,11 +68,20 @@ private[streaming] class BlockManagerBasedBlockHandler(
extends ReceivedBlockHandler with Logging {

def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {

var numRecords = None: Option[Long]

val putResult: Seq[(BlockId, BlockStatus)] = block match {
case ArrayBufferBlock(arrayBuffer) =>
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
numRecords = Some(arrayBuffer.size.toLong)
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
tellMaster = true)
case IteratorBlock(iterator) =>
blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
val countIterator = new CountingIterator(iterator)
val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
tellMaster = true)
numRecords = countIterator.count
putResult
case ByteBufferBlock(byteBuffer) =>
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
case o =>
Expand All @@ -79,7 +92,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
BlockManagerBasedStoreResult(blockId)
BlockManagerBasedStoreResult(blockId, numRecords)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a ByteBufferBlock is added, then this means that none of the data ever gets counted as a record. This is a bit of an issue, since the number of records being zero does not make sense.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how we can count ByteBufferBlock ? if you count one block as 1 message, that is also wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, technically it is a single record - though I agree that is not exactly right either, but it must count as at least 1, correct?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas @zsxwing what you think ? Is it fine to count ByteBufferBlock as 1 count ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bytebuffer is expected to have multiple serialized records in it, so I think it is more wrong to count it as 1. So None is better than Some(1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are wrong but counting it as zero means we end up suggesting no data was received. I personally think they should be counted as 1, we can doc it

}

def cleanupOldBlocks(threshTime: Long) {
Expand All @@ -96,6 +109,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
*/
private[streaming] case class WriteAheadLogBasedStoreResult(
blockId: StreamBlockId,
numRecords: Option[Long],
walRecordHandle: WriteAheadLogRecordHandle
) extends ReceivedBlockStoreResult

Expand Down Expand Up @@ -151,12 +165,17 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
*/
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {

var numRecords = None: Option[Long]
// Serialize the block so that it can be inserted into both
val serializedBlock = block match {
case ArrayBufferBlock(arrayBuffer) =>
numRecords = Some(arrayBuffer.size.toLong)
blockManager.dataSerialize(blockId, arrayBuffer.iterator)
case IteratorBlock(iterator) =>
blockManager.dataSerialize(blockId, iterator)
val countIterator = new CountingIterator(iterator)
val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
numRecords = countIterator.count
serializedBlock
case ByteBufferBlock(byteBuffer) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as before. How did we count ByteBufferBlocks?

byteBuffer
case _ =>
Expand All @@ -181,7 +200,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Combine the futures, wait for both to complete, and return the write ahead log record handle
val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
WriteAheadLogBasedStoreResult(blockId, walRecordHandle)
WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle)
}

def cleanupOldBlocks(threshTime: Long) {
Expand All @@ -199,3 +218,23 @@ private[streaming] object WriteAheadLogBasedBlockHandler {
new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString
}
}

/**
* A utility that will wrap the Iterator to get the count
*/
private class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] {
private var _count = 0

private def isFullyConsumed: Boolean = !iterator.hasNext

def hasNext(): Boolean = iterator.hasNext

def count(): Option[Long] = {
if (isFullyConsumed) Some(_count) else None
}

def next(): T = {
_count += 1
iterator.next()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,10 @@ private[streaming] class ReceiverSupervisorImpl(
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val numRecords = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong)
case _ => None
}

val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")

val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,19 @@ class ReceivedBlockHandlerSuite

val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
val hadoopConf = new Configuration()
val storageLevel = StorageLevel.MEMORY_ONLY_SER
val streamId = 1
val securityMgr = new SecurityManager(conf)
val mapOutputTracker = new MapOutputTrackerMaster(conf)
val shuffleManager = new HashShuffleManager(conf)
val serializer = new KryoSerializer(conf)
val manualClock = new ManualClock
val blockManagerSize = 10000000
val blockManagerBuffer = new ArrayBuffer[BlockManager]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a huge anti-pattern to me! Unless you're testing the interaction of multiple block managers, creating a a buffer to manage cleanup seems to suggest bad test design to me.


var rpcEnv: RpcEnv = null
var blockManagerMaster: BlockManagerMaster = null
var blockManager: BlockManager = null
var storageLevel: StorageLevel = null
var tempDirectory: File = null

before {
Expand All @@ -70,20 +71,21 @@ class ReceivedBlockHandlerSuite
blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)

blockManager = new BlockManager("bm", rpcEnv, blockManagerMaster, serializer,
blockManagerSize, conf, mapOutputTracker, shuffleManager,
new NioBlockTransferService(conf, securityMgr), securityMgr, 0)
blockManager.initialize("app-id")
storageLevel = StorageLevel.MEMORY_ONLY_SER
blockManager = createBlockManager(blockManagerSize, conf)

tempDirectory = Utils.createTempDir()
manualClock.setTime(0)
}

after {
if (blockManager != null) {
blockManager.stop()
blockManager = null
for ( blockManager <- blockManagerBuffer ) {
if (blockManager != null) {
blockManager.stop()
}
}
blockManager = null
blockManagerBuffer.clear()
if (blockManagerMaster != null) {
blockManagerMaster.stop()
blockManagerMaster = null
Expand Down Expand Up @@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
}
}

test("Test Block - count messages") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of a testing anti-pattern which I've seen a lot: this test is actually testing two completely independent cases, one with a BlockManager-based block handler and another with a WAL-based handler. As a result, this deserves to be two test cases so that they can fail independently. As things stand now, a bug could introduce a change which causes only the BlockManager case to fail and that would cause the second test to be skipped rather than run.

The fact that this test is four lines and half of the lines are comments explaining the two cases is another hint that this should be two separate tests, since then the name in test() could explain the cases.

// Test count with BlockManagedBasedBlockHandler
testCountWithBlockManagerBasedBlockHandler(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is a bad function name: if I'm only reading this line and am not jumping around in the IDE, how I am I supposed to know what this boolean parameter denotes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method of switching the handlers in tests is also bad because it doesn't generalize to handle a third type of handler. Instead, I think it would have been better to just pass in either withWriteAheadLogBasedBlockHandler or withWriteAheadLogBasedBlockHandler into some function, since both of those methods are instances of a common function supertype. Higher-order functions are a great language feature and should be used more in test code where appropriate.

// Test count with WriteAheadLogBasedBlockHandler
testCountWithBlockManagerBasedBlockHandler(false)
}

test("Test Block - isFullyConsumed") {
val sparkConf = new SparkConf()
sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
// spark.storage.unrollFraction set to 0.4 for BlockManager
sparkConf.set("spark.storage.unrollFraction", "0.4")
// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
blockManager = createBlockManager(12000, sparkConf)

// there is not enough space to store this block in MEMORY,
// But BlockManager will be able to sereliaze this block to WAL
// and hence count returns correct value.
testRecordcount(false, StorageLevel.MEMORY_ONLY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just ran into another bug related to test code from this PR! This part of the test should guarantee that the block actually won't fit, either by making the block size absolutely huge (e.g. 5 megabytes) or by pushing asserts into testRecordcount. As it stands now, this test doesn't exercise the right thing because the block actually is stored in master.

IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))

// there is not enough space to store this block in MEMORY,
// But BlockManager will be able to sereliaze this block to DISK
// and hence count returns correct value.
testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))

// there is not enough space to store this block With MEMORY_ONLY StorageLevel.
// BlockManager will not be able to unroll this block
// and hence it will not tryToPut this block, resulting the SparkException
storageLevel = StorageLevel.MEMORY_ONLY
withBlockManagerBasedBlockHandler { handler =>
val thrown = intercept[SparkException] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of weird to assign the exception to a variable and then not write any asserts over the exception or its message.

storeSingleBlock(handler, IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator))
}
}
}

private def testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: Boolean) {
// ByteBufferBlock-MEMORY_ONLY
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is a messy / bad way of writing this test. If we want to test a whole bunch of cases, then I think it would make sense to use a for-loop to build up some test cases. I'd also omit the // comments here, since they're not helpful to me as a reader.

testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None)
// ByteBufferBlock-MEMORY_ONLY_SER
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None)
// ArrayBufferBlock-MEMORY_ONLY
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
// ArrayBufferBlock-MEMORY_ONLY_SER
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
// ArrayBufferBlock-DISK_ONLY
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY,
ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
// ArrayBufferBlock-MEMORY_AND_DISK
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_AND_DISK,
ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
// IteratorBlock-MEMORY_ONLY
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100))
// IteratorBlock-MEMORY_ONLY_SER
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100))
// IteratorBlock-DISK_ONLY
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY,
IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, Some(125))
// IteratorBlock-MEMORY_AND_DISK
testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_AND_DISK,
IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, Some(150))
}

private def createBlockManager(
maxMem: Long,
conf: SparkConf,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf,
mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
manager.initialize("app-id")
blockManagerBuffer += manager
manager
}

/**
* Test storing of data using different types of Handler, StorageLevle and ReceivedBlocks
* and verify the correct record count
*/
private def testRecordcount(isBlockManagedBasedBlockHandler: Boolean,
sLevel: StorageLevel,
receivedBlock: ReceivedBlock,
bManager: BlockManager,
expectedNumRecords: Option[Long]
) {
blockManager = bManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect indent, but taking care of it before merging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why re-assign to block manager here? It looks like this is always called with the same block manager, so this is really confusing to me as a reader who's unfamiliar with this code.

storageLevel = sLevel
var bId: StreamBlockId = null
try {
if (isBlockManagedBasedBlockHandler) {
// test received block with BlockManager based handler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is obvious from the next line; no need to comment on it.

withBlockManagerBasedBlockHandler { handler =>
val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect indent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can take care of it

bId = blockId
assert(blockStoreResult.numRecords === expectedNumRecords,
"Message count not matches for a " +
receivedBlock.getClass.getName +
" being inserted using BlockManagerBasedBlockHandler with " + sLevel)
}
} else {
// test received block with WAL based handler
withWriteAheadLogBasedBlockHandler { handler =>
val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock)
bId = blockId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect indent

assert(blockStoreResult.numRecords === expectedNumRecords,
"Message count not matches for a " +
receivedBlock.getClass.getName +
" being inserted using WriteAheadLogBasedBlockHandler with " + sLevel)
}
}
} finally {
// Removing the Block Id to use same blockManager for next test
blockManager.removeBlock(bId, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cost of setup / teardown for the fixtures in this suite should be pretty low, so it seems unnecessarily complicated to try to re-use them like this vs. simply wrapping each case in test(..) {} and letting BeforeAndAfter handle it.

}
}

/**
* Test storing of data using different forms of ReceivedBlocks and verify that they succeeded
* using the given verification function
Expand Down Expand Up @@ -251,9 +377,21 @@ class ReceivedBlockHandlerSuite
(blockIds, storeResults)
}

/** Store single block using a handler */
private def storeSingleBlock(
handler: ReceivedBlockHandler,
block: ReceivedBlock
): (StreamBlockId, ReceivedBlockStoreResult) = {
val blockId = generateBlockId
val blockStoreResult = handler.storeBlock(blockId, block)
logDebug("Done inserting")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a very low-usefulness log message. At the very least I would have included the blockId and the outcome of storing (success or failure).

(blockId, blockStoreResult)
}

private def getWriteAheadLogFiles(): Seq[String] = {
getLogFilesInDirectory(checkpointDirToLogDir(tempDirectory.toString, streamId))
}

private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong)
}

Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class ReceivedBlockTrackerSuite
/** Generate blocks infos using random ids */
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)))))
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change it to 0 from Random.nextInt? Let's not touch other testsuites without understanding the implications.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not change the Random.nextInt. Just added 0 as BlockManagerBasedStoreResult now takes the recordCount. This testcase is for testing the recordcount 0 which @zsxwing did, which I changed after the merge.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not an issue

}

/** Get all the data written in the given write ahead log file. */
Expand Down