Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1506][FOLLOWUP] InFlightRequestTracker should not reset totalInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight #2725

Closed
wants to merge 1 commit into from

Conversation

SteNicholas
Copy link
Member

@SteNicholas SteNicholas commented Sep 5, 2024

What changes were proposed in this pull request?

InFlightRequestTracker should not reset totalInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight.

Follow up #2621.

Why are the changes needed?

After #2621, there is a common case that attempt 0 succeeds in write and celeborn client successfully calls mapperEnd, but the attempt fails due to certain exception like ExecutorLostFailure . Meanwhile, other attempts are rerun, then clean up pushState because of mapper ended. The case causes the exception which is Waiting timeout for task %s while limiting zero in-flight requests for limitZeroInFlight. Therefore, InFlightRequestTracker could not reset totalInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight. InFlightRequestTracker uses cleaned flag in limitMaxInFlight and limitZeroInFlight.

image

4/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] WARN InFlightRequestTracker: Clear InFlightRequestTracker
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.10:9092, creating a new one.
24/09/05 08:27:04 [data-client-5-1] WARN InFlightRequestTracker: BatchIdSet of 172.27.164.39:9092 is null.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.38:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.31:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.160.19:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.32.31:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.44.32:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.18:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.172.18:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.164.19:9092, creating a new one.
24/09/05 08:27:15 [dispatcher-Executor] INFO Executor: Executor is trying to kill task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded
24/09/05 08:27:15 [Executor task launch worker for task 706.1 in stage 1.0 (TID 1203)] INFO Executor: Executor interrupted and killed task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded
24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there are still [] in flight, which exceeds the current limit 0.
24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR Executor: Exception in task 715.2 in stage 1.0 (TID 1205)
org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout for task 1-715-2 while limiting zero in-flight requests
	at org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:676)
	at org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1555)
	at org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1539)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:144)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
24/09/05 08:49:21 [dispatcher-Executor] INFO YarnCoarseGrainedExecutorBackend: Driver commanded a shutdown
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] WARN ShuffleClientImpl: Shuffle client has been shutdown!
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO MemoryStore: MemoryStore cleared
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO BlockManager: BlockManager stopped
24/09/05 08:49:21 [pool-5-thread-1] INFO ShutdownHookManager: Shutdown hook called
ntImpl.mapperEnd(ShuffleClientImpl.java:1539)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:144)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Does this PR introduce any user-facing change?

No.

How was this patch tested?

No.

@SteNicholas SteNicholas changed the title [CELEBORN-1506] totalInflightReqs should decrement when inflightBatchesPerAddress isn't empty to avoid negative totalInflightReqs for limitZeroInFlight [CELEBORN-1506] InFlightRequestTracker should decrement totalInflightReqs when inflightBatchesPerAddress isn't empty to avoid negative totalInflightReqs for limitZeroInFlight Sep 5, 2024
@SteNicholas SteNicholas changed the title [CELEBORN-1506] InFlightRequestTracker should decrement totalInflightReqs when inflightBatchesPerAddress isn't empty to avoid negative totalInflightReqs for limitZeroInFlight [CELEBORN-1506][FOLLOWUP] InFlightRequestTracker should decrement totalInflightReqs when inflightBatchesPerAddress isn't empty to avoid negative totalInflightReqs for limitZeroInFlight Sep 5, 2024
@SteNicholas SteNicholas force-pushed the CELEBORN-1506 branch 3 times, most recently from d4bc018 to ad7448c Compare September 5, 2024 08:56
@waitinfuture
Copy link
Contributor

waitinfuture commented Sep 5, 2024

After discussing with @SteNicholas , we thought of two reasons why removeBatch is called after cleanup:

  1. stage retry without [CELEBORN-1496] Differentiate map results with only different stageAttemptId #2609 , the retried task has the same (shuffleId, mapId, attemptId), which has already mapper-ended in last retry
  2. attempt 0 succeeds in write and celeborn client successfully called mapperEnd, but Spark thinks it's failed and rerun other attempts, then other attempts cleans up pushState because of mapper ended.

@waitinfuture
Copy link
Contributor

waitinfuture commented Sep 5, 2024

I suggest do this:

  1. do not call totalInflightReqs.reset(); in InFlightRequestTracker.cleanup
  2. check mapperEnd in limitZeroInFlight and return true immediately if already ended
  3. check mapperEnd in pushMergedData and return immediately if already ended

cc @RexXiong @AngersZhuuuu @FMX

@leixm
Copy link
Contributor

leixm commented Sep 6, 2024

After discussing with @SteNicholas , we thought of two reasons why removeBatch is called after cleanup:

  1. stage retry without [CELEBORN-1496] Differentiate map results with only different stageAttemptId #2609 , the retried task has the same (shuffleId, mapId, attemptId), which has already mapper-ended in last retry
  2. attempt 0 succeeds in write and celeborn client successfully called mapperEnd, but Spark thinks it's failed and rerun other attempts, then other attempts cleans up pushState because of mapper ended.

Agree, scene 2 often happens in our production environment.

@leixm
Copy link
Contributor

leixm commented Sep 6, 2024

I suggest do this:

  1. do not call totalInflightReqs.reset(); in InFlightRequestTracker.cleanup
  2. check mapperEnd in limitZeroInFlight and return true immediately if already ended
  3. check mapperEnd in pushMergedData and return immediately if already ended

cc @RexXiong @AngersZhuuuu @FMX

We can't just focus on limitZeroInFlight, but also limitMaxInFlight. cc @waitinfuture

image
image

@SteNicholas SteNicholas force-pushed the CELEBORN-1506 branch 2 times, most recently from d3174da to 4b8ddea Compare September 6, 2024 04:02
@SteNicholas SteNicholas changed the title [CELEBORN-1506][FOLLOWUP] InFlightRequestTracker should decrement totalInflightReqs when inflightBatchesPerAddress isn't empty to avoid negative totalInflightReqs for limitZeroInFlight [CELEBORN-1506][FOLLOWUP] InFlightRequestTracker should not reset totalInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight Sep 6, 2024
@waitinfuture
Copy link
Contributor

I suggest do this:

  1. do not call totalInflightReqs.reset(); in InFlightRequestTracker.cleanup
  2. check mapperEnd in limitZeroInFlight and return true immediately if already ended
  3. check mapperEnd in pushMergedData and return immediately if already ended

cc @RexXiong @AngersZhuuuu @FMX

We can't just focus on limitZeroInFlight, but also limitMaxInFlight. cc @waitinfuture

image image

Does your code contain this ? #2191

@leixm
Copy link
Contributor

leixm commented Sep 6, 2024

I suggest do this:

  1. do not call totalInflightReqs.reset(); in InFlightRequestTracker.cleanup
  2. check mapperEnd in limitZeroInFlight and return true immediately if already ended
  3. check mapperEnd in pushMergedData and return immediately if already ended

cc @RexXiong @AngersZhuuuu @FMX

We can't just focus on limitZeroInFlight, but also limitMaxInFlight. cc @waitinfuture
image image

Does your code contain this ? #2191

Yes.

Copy link
Contributor

@FMX FMX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

…alInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight
@RexXiong RexXiong closed this in 5875f68 Sep 6, 2024
RexXiong pushed a commit that referenced this pull request Sep 6, 2024
…alInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight

### What changes were proposed in this pull request?

`InFlightRequestTracker` should not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`.

Follow up #2621.

### Why are the changes needed?

After #2621, there is a common case that attempt 0 succeeds in write and celeborn client successfully calls mapperEnd, but the attempt fails due to certain exception like `ExecutorLostFailure` . Meanwhile, other attempts are rerun, then clean up `pushState` because of mapper ended. The case causes the exception which is `Waiting timeout for task %s while limiting zero in-flight requests` for `limitZeroInFlight`. Therefore, `InFlightRequestTracker` could not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`. `InFlightRequestTracker` uses `cleaned` flag in `limitMaxInFlight` and `limitZeroInFlight`.

![image](https://github.com/user-attachments/assets/3b66d42e-5d6a-411f-8c3a-360349897ab7)

```
4/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] WARN InFlightRequestTracker: Clear InFlightRequestTracker
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.10:9092, creating a new one.
24/09/05 08:27:04 [data-client-5-1] WARN InFlightRequestTracker: BatchIdSet of 172.27.164.39:9092 is null.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.38:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.31:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.160.19:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.32.31:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.44.32:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.18:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.172.18:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.164.19:9092, creating a new one.
24/09/05 08:27:15 [dispatcher-Executor] INFO Executor: Executor is trying to kill task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded
24/09/05 08:27:15 [Executor task launch worker for task 706.1 in stage 1.0 (TID 1203)] INFO Executor: Executor interrupted and killed task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded
24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there are still [] in flight, which exceeds the current limit 0.
24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR Executor: Exception in task 715.2 in stage 1.0 (TID 1205)
org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout for task 1-715-2 while limiting zero in-flight requests
	at org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:676)
	at org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1555)
	at org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1539)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:144)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
24/09/05 08:49:21 [dispatcher-Executor] INFO YarnCoarseGrainedExecutorBackend: Driver commanded a shutdown
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] WARN ShuffleClientImpl: Shuffle client has been shutdown!
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO MemoryStore: MemoryStore cleared
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO BlockManager: BlockManager stopped
24/09/05 08:49:21 [pool-5-thread-1] INFO ShutdownHookManager: Shutdown hook called
ntImpl.mapperEnd(ShuffleClientImpl.java:1539)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:144)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes #2725 from SteNicholas/CELEBORN-1506.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
(cherry picked from commit 5875f68)
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
RexXiong pushed a commit that referenced this pull request Sep 6, 2024
…alInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight

### What changes were proposed in this pull request?

`InFlightRequestTracker` should not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`.

Follow up #2621.

### Why are the changes needed?

After #2621, there is a common case that attempt 0 succeeds in write and celeborn client successfully calls mapperEnd, but the attempt fails due to certain exception like `ExecutorLostFailure` . Meanwhile, other attempts are rerun, then clean up `pushState` because of mapper ended. The case causes the exception which is `Waiting timeout for task %s while limiting zero in-flight requests` for `limitZeroInFlight`. Therefore, `InFlightRequestTracker` could not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`. `InFlightRequestTracker` uses `cleaned` flag in `limitMaxInFlight` and `limitZeroInFlight`.

![image](https://github.com/user-attachments/assets/3b66d42e-5d6a-411f-8c3a-360349897ab7)

```
4/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] WARN InFlightRequestTracker: Clear InFlightRequestTracker
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.10:9092, creating a new one.
24/09/05 08:27:04 [data-client-5-1] WARN InFlightRequestTracker: BatchIdSet of 172.27.164.39:9092 is null.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.38:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.31:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.160.19:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.32.31:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.44.32:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.18:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.172.18:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.164.19:9092, creating a new one.
24/09/05 08:27:15 [dispatcher-Executor] INFO Executor: Executor is trying to kill task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded
24/09/05 08:27:15 [Executor task launch worker for task 706.1 in stage 1.0 (TID 1203)] INFO Executor: Executor interrupted and killed task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded
24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there are still [] in flight, which exceeds the current limit 0.
24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR Executor: Exception in task 715.2 in stage 1.0 (TID 1205)
org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout for task 1-715-2 while limiting zero in-flight requests
	at org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:676)
	at org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1555)
	at org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1539)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:144)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
24/09/05 08:49:21 [dispatcher-Executor] INFO YarnCoarseGrainedExecutorBackend: Driver commanded a shutdown
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] WARN ShuffleClientImpl: Shuffle client has been shutdown!
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO MemoryStore: MemoryStore cleared
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO BlockManager: BlockManager stopped
24/09/05 08:49:21 [pool-5-thread-1] INFO ShutdownHookManager: Shutdown hook called
ntImpl.mapperEnd(ShuffleClientImpl.java:1539)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:144)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes #2725 from SteNicholas/CELEBORN-1506.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
(cherry picked from commit 5875f68)
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
@RexXiong
Copy link
Contributor

RexXiong commented Sep 6, 2024

Thanks, merge to main(v0.6.0) and branch-0.5(v0.5.2) and branch-0.4(v0.4.3)

s0nskar pushed a commit to s0nskar/celeborn that referenced this pull request Sep 16, 2024
…alInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight

### What changes were proposed in this pull request?

`InFlightRequestTracker` should not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`.

Follow up apache#2621.

### Why are the changes needed?

After apache#2621, there is a common case that attempt 0 succeeds in write and celeborn client successfully calls mapperEnd, but the attempt fails due to certain exception like `ExecutorLostFailure` . Meanwhile, other attempts are rerun, then clean up `pushState` because of mapper ended. The case causes the exception which is `Waiting timeout for task %s while limiting zero in-flight requests` for `limitZeroInFlight`. Therefore, `InFlightRequestTracker` could not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`. `InFlightRequestTracker` uses `cleaned` flag in `limitMaxInFlight` and `limitZeroInFlight`.

![image](https://github.com/user-attachments/assets/3b66d42e-5d6a-411f-8c3a-360349897ab7)

```
4/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] WARN InFlightRequestTracker: Clear InFlightRequestTracker
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.10:9092, creating a new one.
24/09/05 08:27:04 [data-client-5-1] WARN InFlightRequestTracker: BatchIdSet of 172.27.164.39:9092 is null.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.38:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.31:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.160.19:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.32.31:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.44.32:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.18:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.172.18:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.164.19:9092, creating a new one.
24/09/05 08:27:15 [dispatcher-Executor] INFO Executor: Executor is trying to kill task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded
24/09/05 08:27:15 [Executor task launch worker for task 706.1 in stage 1.0 (TID 1203)] INFO Executor: Executor interrupted and killed task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded
24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there are still [] in flight, which exceeds the current limit 0.
24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR Executor: Exception in task 715.2 in stage 1.0 (TID 1205)
org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout for task 1-715-2 while limiting zero in-flight requests
	at org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:676)
	at org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1555)
	at org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1539)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:144)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
24/09/05 08:49:21 [dispatcher-Executor] INFO YarnCoarseGrainedExecutorBackend: Driver commanded a shutdown
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] WARN ShuffleClientImpl: Shuffle client has been shutdown!
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO MemoryStore: MemoryStore cleared
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO BlockManager: BlockManager stopped
24/09/05 08:49:21 [pool-5-thread-1] INFO ShutdownHookManager: Shutdown hook called
ntImpl.mapperEnd(ShuffleClientImpl.java:1539)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:144)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes apache#2725 from SteNicholas/CELEBORN-1506.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
wankunde pushed a commit to wankunde/celeborn that referenced this pull request Oct 11, 2024
…alInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight

### What changes were proposed in this pull request?

`InFlightRequestTracker` should not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`.

Follow up apache#2621.

### Why are the changes needed?

After apache#2621, there is a common case that attempt 0 succeeds in write and celeborn client successfully calls mapperEnd, but the attempt fails due to certain exception like `ExecutorLostFailure` . Meanwhile, other attempts are rerun, then clean up `pushState` because of mapper ended. The case causes the exception which is `Waiting timeout for task %s while limiting zero in-flight requests` for `limitZeroInFlight`. Therefore, `InFlightRequestTracker` could not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`. `InFlightRequestTracker` uses `cleaned` flag in `limitMaxInFlight` and `limitZeroInFlight`.

![image](https://github.com/user-attachments/assets/3b66d42e-5d6a-411f-8c3a-360349897ab7)

```
4/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] WARN InFlightRequestTracker: Clear InFlightRequestTracker
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.10:9092, creating a new one.
24/09/05 08:27:04 [data-client-5-1] WARN InFlightRequestTracker: BatchIdSet of 172.27.164.39:9092 is null.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.38:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.31:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.160.19:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.32.31:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.44.32:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.18:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.172.18:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.164.19:9092, creating a new one.
24/09/05 08:27:15 [dispatcher-Executor] INFO Executor: Executor is trying to kill task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded
24/09/05 08:27:15 [Executor task launch worker for task 706.1 in stage 1.0 (TID 1203)] INFO Executor: Executor interrupted and killed task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded
24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there are still [] in flight, which exceeds the current limit 0.
24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR Executor: Exception in task 715.2 in stage 1.0 (TID 1205)
org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout for task 1-715-2 while limiting zero in-flight requests
	at org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:676)
	at org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1555)
	at org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1539)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:144)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
24/09/05 08:49:21 [dispatcher-Executor] INFO YarnCoarseGrainedExecutorBackend: Driver commanded a shutdown
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] WARN ShuffleClientImpl: Shuffle client has been shutdown!
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO MemoryStore: MemoryStore cleared
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO BlockManager: BlockManager stopped
24/09/05 08:49:21 [pool-5-thread-1] INFO ShutdownHookManager: Shutdown hook called
ntImpl.mapperEnd(ShuffleClientImpl.java:1539)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
	at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:144)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes apache#2725 from SteNicholas/CELEBORN-1506.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants