Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37695][CORE][SHUFFLE] Skip diagnosis ob merged blocks from push-based shuffle #34961

Closed
wants to merge 4 commits into from

Conversation

pan3793
Copy link
Member

@pan3793 pan3793 commented Dec 20, 2021

What changes were proposed in this pull request?

Skip diagnosis ob merged blocks from push-based shuffle

Why are the changes needed?

Because SPARK-36284 has not been addressed yet, skip it to suppress exceptions.

21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
	at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307)
	at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748) 

Does this PR introduce any user-facing change?

Yes, suppress the exceptions.

How was this patch tested?

Run 1T TPCDS manually.

@github-actions github-actions bot added the CORE label Dec 20, 2021
.filterNot { _ => blockId.isShuffleChunk }
.map { checkedIn =>
iterator.diagnoseCorruption(checkedIn, address, blockId)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we do the check inside diagnoseCorruption() to ensure other calls on it are also safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

case shuffleBlockChunk: ShuffleBlockChunkId =>
// TODO SPARK-36284 Add shuffle checksum support for push-based shuffle
val diagnosisResponse = s"BlockChunk $shuffleBlockChunk is corrupted but corruption " +
s"diagnosis failed due to lack of shuffle checksum support for push-based shuffle."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"...diagnosis is skipped due to..."?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@@ -1037,39 +1037,48 @@ final class ShuffleBlockFetcherIterator(
blockId: BlockId): String = {
logInfo("Start corruption diagnosis.")
val startTimeNs = System.nanoTime()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line could move to ShuffleBlockId case only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@Ngone51
Copy link
Member

Ngone51 commented Dec 21, 2021

LGTM excepts 2 minor comments.

@Ngone51
Copy link
Member

Ngone51 commented Dec 21, 2021

cc @mridulm

@yaooqinn
Copy link
Member

ok to test

Copy link
Member

@yaooqinn yaooqinn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented Dec 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50902/

@SparkQA
Copy link

SparkQA commented Dec 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50902/

@SparkQA
Copy link

SparkQA commented Dec 21, 2021

Test build #146427 has finished for PR 34961 at commit ce42f59.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@Ngone51 Ngone51 closed this in 57ca75f Dec 23, 2021
Ngone51 pushed a commit that referenced this pull request Dec 23, 2021
…h-based shuffle

### What changes were proposed in this pull request?

Skip diagnosis ob merged blocks from push-based shuffle

### Why are the changes needed?

Because SPARK-36284 has not been addressed yet, skip it to suppress exceptions.

```
21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
	at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307)
	at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Does this PR introduce _any_ user-facing change?

Yes, suppress the exceptions.

### How was this patch tested?

Run 1T TPCDS manually.

Closes #34961 from pan3793/SPARK-37695.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: yi.wu <yi.wu@databricks.com>
(cherry picked from commit 57ca75f)
Signed-off-by: yi.wu <yi.wu@databricks.com>
@Ngone51
Copy link
Member

Ngone51 commented Dec 23, 2021

Thanks, merged to master, branch-3.2.

@pan3793 pan3793 deleted the SPARK-37695 branch December 27, 2021 12:15
catalinii pushed a commit to lyft/spark that referenced this pull request Feb 22, 2022
…h-based shuffle

### What changes were proposed in this pull request?

Skip diagnosis ob merged blocks from push-based shuffle

### Why are the changes needed?

Because SPARK-36284 has not been addressed yet, skip it to suppress exceptions.

```
21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
	at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307)
	at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Does this PR introduce _any_ user-facing change?

Yes, suppress the exceptions.

### How was this patch tested?

Run 1T TPCDS manually.

Closes apache#34961 from pan3793/SPARK-37695.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: yi.wu <yi.wu@databricks.com>
(cherry picked from commit 57ca75f)
Signed-off-by: yi.wu <yi.wu@databricks.com>
catalinii pushed a commit to lyft/spark that referenced this pull request Mar 4, 2022
…h-based shuffle

### What changes were proposed in this pull request?

Skip diagnosis ob merged blocks from push-based shuffle

### Why are the changes needed?

Because SPARK-36284 has not been addressed yet, skip it to suppress exceptions.

```
21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
	at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307)
	at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Does this PR introduce _any_ user-facing change?

Yes, suppress the exceptions.

### How was this patch tested?

Run 1T TPCDS manually.

Closes apache#34961 from pan3793/SPARK-37695.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: yi.wu <yi.wu@databricks.com>
(cherry picked from commit 57ca75f)
Signed-off-by: yi.wu <yi.wu@databricks.com>
kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
…h-based shuffle

### What changes were proposed in this pull request?

Skip diagnosis ob merged blocks from push-based shuffle

### Why are the changes needed?

Because SPARK-36284 has not been addressed yet, skip it to suppress exceptions.

```
21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID 138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed: Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
	at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307)
	at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Does this PR introduce _any_ user-facing change?

Yes, suppress the exceptions.

### How was this patch tested?

Run 1T TPCDS manually.

Closes apache#34961 from pan3793/SPARK-37695.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: yi.wu <yi.wu@databricks.com>
(cherry picked from commit 57ca75f)
Signed-off-by: yi.wu <yi.wu@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants