-
Notifications
You must be signed in to change notification settings - Fork 374
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1506][FOLLOWUP] InFlightRequestTracker should not reset totalInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight #2725
Conversation
8596f52
to
d527122
Compare
d4bc018
to
ad7448c
Compare
common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
Outdated
Show resolved
Hide resolved
After discussing with @SteNicholas , we thought of two reasons why removeBatch is called after cleanup:
|
I suggest do this:
|
Agree, scene 2 often happens in our production environment. |
We can't just focus on limitZeroInFlight, but also limitMaxInFlight. cc @waitinfuture |
d3174da
to
4b8ddea
Compare
4b8ddea
to
f452a04
Compare
Does your code contain this ? #2191 |
Yes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
Outdated
Show resolved
Hide resolved
…alInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight
f452a04
to
e5204c9
Compare
…alInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight ### What changes were proposed in this pull request? `InFlightRequestTracker` should not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`. Follow up #2621. ### Why are the changes needed? After #2621, there is a common case that attempt 0 succeeds in write and celeborn client successfully calls mapperEnd, but the attempt fails due to certain exception like `ExecutorLostFailure` . Meanwhile, other attempts are rerun, then clean up `pushState` because of mapper ended. The case causes the exception which is `Waiting timeout for task %s while limiting zero in-flight requests` for `limitZeroInFlight`. Therefore, `InFlightRequestTracker` could not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`. `InFlightRequestTracker` uses `cleaned` flag in `limitMaxInFlight` and `limitZeroInFlight`. ![image](https://github.com/user-attachments/assets/3b66d42e-5d6a-411f-8c3a-360349897ab7) ``` 4/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] WARN InFlightRequestTracker: Clear InFlightRequestTracker 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.10:9092, creating a new one. 24/09/05 08:27:04 [data-client-5-1] WARN InFlightRequestTracker: BatchIdSet of 172.27.164.39:9092 is null. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.38:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.31:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.160.19:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.32.31:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.44.32:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.18:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.172.18:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.164.19:9092, creating a new one. 24/09/05 08:27:15 [dispatcher-Executor] INFO Executor: Executor is trying to kill task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded 24/09/05 08:27:15 [Executor task launch worker for task 706.1 in stage 1.0 (TID 1203)] INFO Executor: Executor interrupted and killed task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded 24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there are still [] in flight, which exceeds the current limit 0. 24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR Executor: Exception in task 715.2 in stage 1.0 (TID 1205) org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout for task 1-715-2 while limiting zero in-flight requests at org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:676) at org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1555) at org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1539) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:144) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 24/09/05 08:49:21 [dispatcher-Executor] INFO YarnCoarseGrainedExecutorBackend: Driver commanded a shutdown 24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] WARN ShuffleClientImpl: Shuffle client has been shutdown! 24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO MemoryStore: MemoryStore cleared 24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO BlockManager: BlockManager stopped 24/09/05 08:49:21 [pool-5-thread-1] INFO ShutdownHookManager: Shutdown hook called ntImpl.mapperEnd(ShuffleClientImpl.java:1539) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:144) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. Closes #2725 from SteNicholas/CELEBORN-1506. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com> (cherry picked from commit 5875f68) Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
…alInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight ### What changes were proposed in this pull request? `InFlightRequestTracker` should not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`. Follow up #2621. ### Why are the changes needed? After #2621, there is a common case that attempt 0 succeeds in write and celeborn client successfully calls mapperEnd, but the attempt fails due to certain exception like `ExecutorLostFailure` . Meanwhile, other attempts are rerun, then clean up `pushState` because of mapper ended. The case causes the exception which is `Waiting timeout for task %s while limiting zero in-flight requests` for `limitZeroInFlight`. Therefore, `InFlightRequestTracker` could not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`. `InFlightRequestTracker` uses `cleaned` flag in `limitMaxInFlight` and `limitZeroInFlight`. ![image](https://github.com/user-attachments/assets/3b66d42e-5d6a-411f-8c3a-360349897ab7) ``` 4/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] WARN InFlightRequestTracker: Clear InFlightRequestTracker 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.10:9092, creating a new one. 24/09/05 08:27:04 [data-client-5-1] WARN InFlightRequestTracker: BatchIdSet of 172.27.164.39:9092 is null. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.38:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.31:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.160.19:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.32.31:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.44.32:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.18:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.172.18:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.164.19:9092, creating a new one. 24/09/05 08:27:15 [dispatcher-Executor] INFO Executor: Executor is trying to kill task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded 24/09/05 08:27:15 [Executor task launch worker for task 706.1 in stage 1.0 (TID 1203)] INFO Executor: Executor interrupted and killed task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded 24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there are still [] in flight, which exceeds the current limit 0. 24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR Executor: Exception in task 715.2 in stage 1.0 (TID 1205) org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout for task 1-715-2 while limiting zero in-flight requests at org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:676) at org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1555) at org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1539) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:144) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 24/09/05 08:49:21 [dispatcher-Executor] INFO YarnCoarseGrainedExecutorBackend: Driver commanded a shutdown 24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] WARN ShuffleClientImpl: Shuffle client has been shutdown! 24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO MemoryStore: MemoryStore cleared 24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO BlockManager: BlockManager stopped 24/09/05 08:49:21 [pool-5-thread-1] INFO ShutdownHookManager: Shutdown hook called ntImpl.mapperEnd(ShuffleClientImpl.java:1539) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:144) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. Closes #2725 from SteNicholas/CELEBORN-1506. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com> (cherry picked from commit 5875f68) Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Thanks, merge to main(v0.6.0) and branch-0.5(v0.5.2) and branch-0.4(v0.4.3) |
…alInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight ### What changes were proposed in this pull request? `InFlightRequestTracker` should not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`. Follow up apache#2621. ### Why are the changes needed? After apache#2621, there is a common case that attempt 0 succeeds in write and celeborn client successfully calls mapperEnd, but the attempt fails due to certain exception like `ExecutorLostFailure` . Meanwhile, other attempts are rerun, then clean up `pushState` because of mapper ended. The case causes the exception which is `Waiting timeout for task %s while limiting zero in-flight requests` for `limitZeroInFlight`. Therefore, `InFlightRequestTracker` could not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`. `InFlightRequestTracker` uses `cleaned` flag in `limitMaxInFlight` and `limitZeroInFlight`. ![image](https://github.com/user-attachments/assets/3b66d42e-5d6a-411f-8c3a-360349897ab7) ``` 4/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] WARN InFlightRequestTracker: Clear InFlightRequestTracker 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.10:9092, creating a new one. 24/09/05 08:27:04 [data-client-5-1] WARN InFlightRequestTracker: BatchIdSet of 172.27.164.39:9092 is null. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.38:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.31:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.160.19:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.32.31:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.44.32:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.18:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.172.18:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.164.19:9092, creating a new one. 24/09/05 08:27:15 [dispatcher-Executor] INFO Executor: Executor is trying to kill task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded 24/09/05 08:27:15 [Executor task launch worker for task 706.1 in stage 1.0 (TID 1203)] INFO Executor: Executor interrupted and killed task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded 24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there are still [] in flight, which exceeds the current limit 0. 24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR Executor: Exception in task 715.2 in stage 1.0 (TID 1205) org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout for task 1-715-2 while limiting zero in-flight requests at org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:676) at org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1555) at org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1539) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:144) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 24/09/05 08:49:21 [dispatcher-Executor] INFO YarnCoarseGrainedExecutorBackend: Driver commanded a shutdown 24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] WARN ShuffleClientImpl: Shuffle client has been shutdown! 24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO MemoryStore: MemoryStore cleared 24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO BlockManager: BlockManager stopped 24/09/05 08:49:21 [pool-5-thread-1] INFO ShutdownHookManager: Shutdown hook called ntImpl.mapperEnd(ShuffleClientImpl.java:1539) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:144) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. Closes apache#2725 from SteNicholas/CELEBORN-1506. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
…alInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight ### What changes were proposed in this pull request? `InFlightRequestTracker` should not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`. Follow up apache#2621. ### Why are the changes needed? After apache#2621, there is a common case that attempt 0 succeeds in write and celeborn client successfully calls mapperEnd, but the attempt fails due to certain exception like `ExecutorLostFailure` . Meanwhile, other attempts are rerun, then clean up `pushState` because of mapper ended. The case causes the exception which is `Waiting timeout for task %s while limiting zero in-flight requests` for `limitZeroInFlight`. Therefore, `InFlightRequestTracker` could not reset `totalInflightReqs` for cleaning up to avoid negative `totalInflightReqs` for `limitZeroInFlight`. `InFlightRequestTracker` uses `cleaned` flag in `limitMaxInFlight` and `limitZeroInFlight`. ![image](https://github.com/user-attachments/assets/3b66d42e-5d6a-411f-8c3a-360349897ab7) ``` 4/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] WARN InFlightRequestTracker: Clear InFlightRequestTracker 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.10:9092, creating a new one. 24/09/05 08:27:04 [data-client-5-1] WARN InFlightRequestTracker: BatchIdSet of 172.27.164.39:9092 is null. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.38:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.36.31:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.160.19:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.32.31:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.44.32:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.168.18:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.172.18:9092, creating a new one. 24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] INFO TransportClientFactory: Found inactive connection to /172.27.164.19:9092, creating a new one. 24/09/05 08:27:15 [dispatcher-Executor] INFO Executor: Executor is trying to kill task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded 24/09/05 08:27:15 [Executor task launch worker for task 706.1 in stage 1.0 (TID 1203)] INFO Executor: Executor interrupted and killed task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded 24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there are still [] in flight, which exceeds the current limit 0. 24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 (TID 1205)] ERROR Executor: Exception in task 715.2 in stage 1.0 (TID 1205) org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout for task 1-715-2 while limiting zero in-flight requests at org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:676) at org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1555) at org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1539) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:144) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 24/09/05 08:49:21 [dispatcher-Executor] INFO YarnCoarseGrainedExecutorBackend: Driver commanded a shutdown 24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] WARN ShuffleClientImpl: Shuffle client has been shutdown! 24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO MemoryStore: MemoryStore cleared 24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO BlockManager: BlockManager stopped 24/09/05 08:49:21 [pool-5-thread-1] INFO ShutdownHookManager: Shutdown hook called ntImpl.mapperEnd(ShuffleClientImpl.java:1539) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367) at org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:144) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. Closes apache#2725 from SteNicholas/CELEBORN-1506. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
What changes were proposed in this pull request?
InFlightRequestTracker
should not resettotalInflightReqs
for cleaning up to avoid negativetotalInflightReqs
forlimitZeroInFlight
.Follow up #2621.
Why are the changes needed?
After #2621, there is a common case that attempt 0 succeeds in write and celeborn client successfully calls mapperEnd, but the attempt fails due to certain exception like
ExecutorLostFailure
. Meanwhile, other attempts are rerun, then clean uppushState
because of mapper ended. The case causes the exception which isWaiting timeout for task %s while limiting zero in-flight requests
forlimitZeroInFlight
. Therefore,InFlightRequestTracker
could not resettotalInflightReqs
for cleaning up to avoid negativetotalInflightReqs
forlimitZeroInFlight
.InFlightRequestTracker
usescleaned
flag inlimitMaxInFlight
andlimitZeroInFlight
.Does this PR introduce any user-facing change?
No.
How was this patch tested?
No.