Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1720] Prevent stage re-run if another task attempt is running or successful #2921

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

turboFei
Copy link
Member

@turboFei turboFei commented Nov 15, 2024

What changes were proposed in this pull request?

Prevent stage re-run if another task attempt is running.

If a shuffle read task can not read the shuffle data and the task another attempt is running or successful, just throw the CelebornIOException instead of FetchFailureException.

The app will not failure before reach the task maxFailures.

image

Why are the changes needed?

I met below issue because I set the wrong parameters, I should set spark.celeborn.data.io.connectTime=30s but set the spark.celeborn.data.io.connectionTime=30s, and the Disk IO Utils was high at that time.

  1. speculation is enabled
  2. one task failed to fetch shuffle 0 in stage 5.
  3. then it triggered the stage 0 re-run (stage 4)
  4. then stage 5 retry, however, no task run in stage 5 (retry 1)
image 4. because the speculation task succeeded, so no task in stage 5(retry 1) image

Due the stage re-run is heavy, so I wonder that, we should ignore the shuffle fetch failure, if there is another task attempt running.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

UT for the SparkUtils method only, due it is impossible to add UT for speculation.

https://github.com/apache/spark/blob/d5da49d56d7dec5f8a96c5252384d865f7efd4d9/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L236-L244

image

For local master, it would not start the speculationScheduler.

https://github.com/apache/spark/blob/d5da49d56d7dec5f8a96c5252384d865f7efd4d9/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L322-L346

image

and it is also not allowed to launch speculative task on the same host.

@turboFei turboFei marked this pull request as draft November 15, 2024 00:12
@turboFei turboFei changed the title Task [WIP][CELEBORN-1720] Prevent stage re-run if task another attempt is running Nov 15, 2024
@turboFei turboFei force-pushed the task_id branch 3 times, most recently from 8c43cc2 to ef61cb3 Compare November 15, 2024 00:28
@turboFei turboFei changed the title [WIP][CELEBORN-1720] Prevent stage re-run if task another attempt is running [WIP][CELEBORN-1720] Prevent stage re-run if another task attempt is running Nov 15, 2024
@turboFei turboFei force-pushed the task_id branch 2 times, most recently from e1d465b to f6b9d24 Compare November 15, 2024 02:25
@turboFei turboFei marked this pull request as ready for review November 15, 2024 03:31
@turboFei
Copy link
Member Author

Seems difficult to add UT, how do you think about? @FMX

@turboFei turboFei marked this pull request as draft November 17, 2024 23:37
@turboFei turboFei marked this pull request as ready for review November 20, 2024 18:26
@turboFei turboFei changed the title [WIP][CELEBORN-1720] Prevent stage re-run if another task attempt is running [CELEBORN-1720] Prevent stage re-run if another task attempt is running Nov 20, 2024
@turboFei turboFei changed the title [CELEBORN-1720] Prevent stage re-run if another task attempt is running [CELEBORN-1720] Prevent stage re-run if another task attempt is running or successful Nov 20, 2024
@turboFei
Copy link
Member Author

It is too difficult to add the UT.

Gentle ping @mridulm

@FMX
Copy link
Contributor

FMX commented Nov 21, 2024

Seems difficult to add UT, how do you think about? @FMX

Hi, I see this PR. IMO, you can add a test config to trigger task hang and fetch failure in certain map tasks. Maybe it won't be too difficult to add UTs.

@turboFei
Copy link
Member Author

R. IMO, you can add a test config to trigger task hang and fetch failure in certain map tasks. Maybe it won't be too difficult to add UTs.

Thanks, added UT and tested locally.

FMX pushed a commit that referenced this pull request Nov 21, 2024
### What changes were proposed in this pull request?
Fix NPE. When failed to connect to celeborn worker, the currentReader might be `null`.

### Why are the changes needed?

I am testing #2921 in the celeborn cluster.

And set the `celeborn.data.io.connectionTimeout` to 30s for fetch failure testing, and it failed to connect to celeborn worker for 3 times, and then the currentReader was null.

<img width="1700" alt="image" src="https://github.com/user-attachments/assets/9473294d-2cca-4f8b-bc86-ab6f70f04cff">

https://github.com/turboFei/incubator-celeborn/blob/2be9682a34f97ff10b90f22f60d9fea2bc5b81b7/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java#L672

```
24/11/20 16:15:41 ERROR Executor: Exception in task 16238.0 in stage 9.0 (TID 108550)
java.lang.NullPointerException
	at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.fillBuffer(CelebornInputStream.java:672)
	at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.read(CelebornInputStream.java:515)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
	at java.io.DataInputStream.read(DataInputStream.java:149)
	at org.sparkproject.guava.io.ByteStreams.read(ByteStreams.java:899)
	at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:733)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:496)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:756)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
	at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
GA.

Closes #2933 from turboFei/npe_reader.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
FMX pushed a commit that referenced this pull request Nov 21, 2024
### What changes were proposed in this pull request?
Fix NPE. When failed to connect to celeborn worker, the currentReader might be `null`.

### Why are the changes needed?

I am testing #2921 in the celeborn cluster.

And set the `celeborn.data.io.connectionTimeout` to 30s for fetch failure testing, and it failed to connect to celeborn worker for 3 times, and then the currentReader was null.

<img width="1700" alt="image" src="https://github.com/user-attachments/assets/9473294d-2cca-4f8b-bc86-ab6f70f04cff">

https://github.com/turboFei/incubator-celeborn/blob/2be9682a34f97ff10b90f22f60d9fea2bc5b81b7/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java#L672

```
24/11/20 16:15:41 ERROR Executor: Exception in task 16238.0 in stage 9.0 (TID 108550)
java.lang.NullPointerException
	at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.fillBuffer(CelebornInputStream.java:672)
	at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.read(CelebornInputStream.java:515)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
	at java.io.DataInputStream.read(DataInputStream.java:149)
	at org.sparkproject.guava.io.ByteStreams.read(ByteStreams.java:899)
	at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:733)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:496)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:756)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
	at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
GA.

Closes #2933 from turboFei/npe_reader.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
(cherry picked from commit 094fe28)
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
@turboFei turboFei marked this pull request as draft November 21, 2024 08:30
@turboFei
Copy link
Member Author

The UT is invalid, checking

@turboFei turboFei changed the title [CELEBORN-1720] Prevent stage re-run if another task attempt is running or successful [WIP][CELEBORN-1720] Prevent stage re-run if another task attempt is running or successful Nov 21, 2024
@turboFei
Copy link
Member Author

turboFei commented Nov 22, 2024

@turboFei
Copy link
Member Author

turboFei commented Nov 22, 2024

https://github.com/apache/spark/blob/d5da49d56d7dec5f8a96c5252384d865f7efd4d9/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L322-L346

image

and it is also not allowed to launch speculative task on the same host.

@FMX

I have to give up the UT for speculation ...

And only add UT for SparkUtils.

@turboFei turboFei requested review from FMX and removed request for pan3793 November 28, 2024 12:10
@turboFei
Copy link
Member Author

turboFei commented Dec 1, 2024

could you help have a review? I think it is ready. @FMX @RexXiong

@turboFei
Copy link
Member Author

turboFei commented Dec 4, 2024

Could you help take a look? @FMX thank you

@turboFei
Copy link
Member Author

ping @AngersZhuuuu

Copy link
Member

@pan3793 pan3793 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gone through the spark3 code path, lgtm

@turboFei turboFei force-pushed the task_id branch 2 times, most recently from 1c22a21 to 5d56b07 Compare December 20, 2024 08:08
@turboFei turboFei marked this pull request as draft December 20, 2024 08:11
@turboFei turboFei force-pushed the task_id branch 3 times, most recently from 52c3066 to 549f11c Compare December 20, 2024 08:39
@turboFei turboFei marked this pull request as ready for review December 20, 2024 08:44
Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting PR, thanks for working on it @turboFei !

@turboFei turboFei requested a review from mridulm December 21, 2024 04:12
Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding support for this @turboFei !

@turboFei turboFei requested review from RexXiong and FMX and removed request for FMX December 23, 2024 03:27
@turboFei
Copy link
Member Author

rebased the code.

cc @FMX and @RexXiong could you help take a look?

task_id

Align the LOG

spark3 only

Spark 2 (#30)

ut (#31)

revert ut

Refine the check

ut (#33)

log

Ut for spark utils (#36)

comments

record the reported shuffle fetch failure tasks (#42)

nit

Address comments from mridul (#44)

* revert logger => LOG

* taskScheduler instance lock and stage uniq id

* docs

* listener

* spark 2

* comments

* test
@turboFei
Copy link
Member Author

rebased the code and refined the UT.

cc @FMX and @RexXiong

Copy link

codecov bot commented Dec 27, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 32.67%. Comparing base (fde6365) to head (c35172c).
Report is 5 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2921      +/-   ##
==========================================
- Coverage   32.68%   32.67%   -0.00%     
==========================================
  Files         336      336              
  Lines       20032    20032              
  Branches     1792     1792              
==========================================
- Hits         6546     6544       -2     
- Misses      13123    13124       +1     
- Partials      363      364       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants