Skip to content

Commit

Permalink
[SPARK-37695][CORE][SHUFFLE] Skip diagnosis ob merged blocks from pus…
Browse files Browse the repository at this point in the history
…h-based shuffle

### What changes were proposed in this pull request?

Skip diagnosis ob merged blocks from push-based shuffle

### Why are the changes needed?

Because SPARK-36284 has not been addressed yet, skip it to suppress exceptions.

```
21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
	at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307)
	at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Does this PR introduce _any_ user-facing change?

Yes, suppress the exceptions.

### How was this patch tested?

Run 1T TPCDS manually.

Closes apache#34961 from pan3793/SPARK-37695.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: yi.wu <yi.wu@databricks.com>
  • Loading branch information
pan3793 authored and Ngone51 committed Dec 23, 2021
1 parent a70006d commit 57ca75f
Showing 1 changed file with 39 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1036,40 +1036,49 @@ final class ShuffleBlockFetcherIterator(
address: BlockManagerId,
blockId: BlockId): String = {
logInfo("Start corruption diagnosis.")
val startTimeNs = System.nanoTime()
assert(blockId.isInstanceOf[ShuffleBlockId], s"Expected ShuffleBlockId, but got $blockId")
val shuffleBlock = blockId.asInstanceOf[ShuffleBlockId]
val buffer = new Array[Byte](ShuffleChecksumHelper.CHECKSUM_CALCULATION_BUFFER)
// consume the remaining data to calculate the checksum
var cause: Cause = null
try {
while (checkedIn.read(buffer) != -1) {}
val checksum = checkedIn.getChecksum.getValue
cause = shuffleClient.diagnoseCorruption(address.host, address.port, address.executorId,
shuffleBlock.shuffleId, shuffleBlock.mapId, shuffleBlock.reduceId, checksum,
checksumAlgorithm)
} catch {
case e: Exception =>
logWarning("Unable to diagnose the corruption cause of the corrupted block", e)
cause = Cause.UNKNOWN_ISSUE
}
val duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)
val diagnosisResponse = cause match {
case Cause.UNSUPPORTED_CHECKSUM_ALGORITHM =>
s"Block $blockId is corrupted but corruption diagnosis failed due to " +
s"unsupported checksum algorithm: $checksumAlgorithm"
blockId match {
case shuffleBlock: ShuffleBlockId =>
val startTimeNs = System.nanoTime()
val buffer = new Array[Byte](ShuffleChecksumHelper.CHECKSUM_CALCULATION_BUFFER)
// consume the remaining data to calculate the checksum
var cause: Cause = null
try {
while (checkedIn.read(buffer) != -1) {}
val checksum = checkedIn.getChecksum.getValue
cause = shuffleClient.diagnoseCorruption(address.host, address.port, address.executorId,
shuffleBlock.shuffleId, shuffleBlock.mapId, shuffleBlock.reduceId, checksum,
checksumAlgorithm)
} catch {
case e: Exception =>
logWarning("Unable to diagnose the corruption cause of the corrupted block", e)
cause = Cause.UNKNOWN_ISSUE
}
val duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)
val diagnosisResponse = cause match {
case Cause.UNSUPPORTED_CHECKSUM_ALGORITHM =>
s"Block $blockId is corrupted but corruption diagnosis failed due to " +
s"unsupported checksum algorithm: $checksumAlgorithm"

case Cause.CHECKSUM_VERIFY_PASS =>
s"Block $blockId is corrupted but checksum verification passed"
case Cause.CHECKSUM_VERIFY_PASS =>
s"Block $blockId is corrupted but checksum verification passed"

case Cause.UNKNOWN_ISSUE =>
s"Block $blockId is corrupted but the cause is unknown"
case Cause.UNKNOWN_ISSUE =>
s"Block $blockId is corrupted but the cause is unknown"

case otherCause =>
s"Block $blockId is corrupted due to $otherCause"
case otherCause =>
s"Block $blockId is corrupted due to $otherCause"
}
logInfo(s"Finished corruption diagnosis in $duration ms. $diagnosisResponse")
diagnosisResponse
case shuffleBlockChunk: ShuffleBlockChunkId =>
// TODO SPARK-36284 Add shuffle checksum support for push-based shuffle
val diagnosisResponse = s"BlockChunk $shuffleBlockChunk is corrupted but corruption " +
s"diagnosis is skipped due to lack of shuffle checksum support for push-based shuffle."
logWarning(diagnosisResponse)
diagnosisResponse
case unexpected: BlockId =>
throw new IllegalArgumentException(s"Unexpected type of BlockId, $unexpected")
}
logInfo(s"Finished corruption diagnosis in $duration ms. $diagnosisResponse")
diagnosisResponse
}

def toCompletionIterator: Iterator[(BlockId, InputStream)] = {
Expand Down

0 comments on commit 57ca75f

Please sign in to comment.