Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46895][CORE] Replace Timer with single thread scheduled executor #44718

Closed

Conversation

beliefer
Copy link
Contributor

What changes were proposed in this pull request?

This PR propose to replace Timer with single thread scheduled executor.

Why are the changes needed?

The javadoc recommends ScheduledThreadPoolExecutor instead of Timer.
屏幕快照 2024-01-12 下午12 47 57

This change based on the following two points.
System time sensitivity

Timer scheduling is based on the absolute time of the operating system and is sensitive to the operating system's time. Once the operating system's time changes, Timer scheduling is no longer precise.
The scheduled Thread Pool Executor scheduling is based on relative time and is not affected by changes in operating system time.

Are anomalies captured

Timer does not capture exceptions thrown by Timer Tasks, and in addition, Timer is single threaded. Once a scheduling task encounters an exception, the entire thread will terminate and other tasks that need to be scheduled will no longer be executed.
The scheduled Thread Pool Executor implements scheduling functions based on a thread pool. After a task throws an exception, other tasks can still execute normally.

Does this PR introduce any user-facing change?

'No'.

How was this patch tested?

GA tests.

Was this patch authored or co-authored using generative AI tooling?

'No'.

@beliefer beliefer force-pushed the replace-timer-with-threadpool branch 2 times, most recently from 15f80b4 to a1cade9 Compare January 14, 2024 03:16
@beliefer beliefer changed the title [WIP][CORE] Replace Timer with single thread scheduled executor [SPARK-46698][CORE][FOLLOWUP] Replace Timer with single thread scheduled executor Jan 14, 2024
@beliefer
Copy link
Contributor Author

beliefer commented Jan 15, 2024

@dongjoon-hyun @srowen @LuciferYang Please help me to take a review.

@@ -168,7 +170,7 @@ private[spark] class BarrierCoordinator(
// we may timeout for the sync.
if (requesters.isEmpty) {
initTimerTask(this)
timer.schedule(timerTask, timeoutInSecs * 1000)
timer.schedule(timerTask, timeoutInSecs * 1000, TimeUnit.MILLISECONDS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just remove "* 1000" and specify units as SECONDS

@beliefer
Copy link
Contributor Author

cc @dongjoon-hyun @LuciferYang .

@@ -71,7 +72,7 @@ class BarrierTaskContext private[spark] (
}
}
// Log the update of global sync every 60 seconds.
timer.schedule(timerTask, 60000, 60000)
timer.scheduleAtFixedRate(timerTask, 60000, 60000, TimeUnit.MILLISECONDS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same idea here, this is just 1 minute

@beliefer beliefer force-pushed the replace-timer-with-threadpool branch from 7c6530a to db8f03c Compare January 19, 2024 13:43
starvationTimer.cancel()
abortTimer.cancel()
starvationTimer.shutdown()
abortTimer.shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine, but would it be more appropriate to use ThreadUtils.shutdown? I am not sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

@@ -128,7 +125,8 @@ private LauncherServer() throws IOException {
this.threadIds = new AtomicLong();
this.factory = new NamedThreadFactory(THREAD_NAME_FMT);
this.secretToPendingApps = new ConcurrentHashMap<>();
this.timeoutTimer = new Timer("LauncherServer-TimeoutTimer", true);
this.timeoutTimer = new ScheduledThreadPoolExecutor(
1, new NamedThreadFactory("LauncherServer-TimeoutTimer"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Indentation

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to avoid using import *

@beliefer beliefer force-pushed the replace-timer-with-threadpool branch 3 times, most recently from 8d4176b to 562c725 Compare January 21, 2024 05:17
@beliefer
Copy link
Contributor Author

The GA failure is unrelated.

@beliefer beliefer force-pushed the replace-timer-with-threadpool branch 3 times, most recently from 86060fd to fdbf3b6 Compare January 23, 2024 03:53
Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine to me.

But I hope @srowen could approve before merge. Thanks ~

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beliefer . Although it may share the same target, this does look like a follow-up for the main PR about ConsoleProgressBar.scala as of now. Given that the change affects many other places, please make a new JIRA ID.

@beliefer beliefer changed the title [SPARK-46698][CORE][FOLLOWUP] Replace Timer with single thread scheduled executor [SPARK-46895][CORE] Replace Timer with single thread scheduled executor Jan 29, 2024
@beliefer
Copy link
Contributor Author

@dongjoon-hyun new jira ID created.

@dongjoon-hyun
Copy link
Member

Thank you. Could you re-trigger the failed test pipelines?

@beliefer
Copy link
Contributor Author

@dongjoon-hyun The GA failure is unrelated.

@beliefer beliefer force-pushed the replace-timer-with-threadpool branch 2 times, most recently from 9de4cb5 to 197c164 Compare February 1, 2024 01:45
@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Feb 3, 2024

Thank you for updating. Could you make CI happy?

[info] *** 23 TESTS FAILED ***
[error] Failed tests:
[error] 	org.apache.spark.deploy.yarn.YarnShuffleAlternateNameConfigWithLevelDBBackendSuite
[error] 	org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite
[error] 	org.apache.spark.deploy.yarn.YarnClusterSuite
[error] 	org.apache.spark.deploy.yarn.YarnShuffleAuthWithLevelDBBackendSuite
[error] 	org.apache.spark.deploy.yarn.YarnShuffleAlternateNameConfigWithRocksDBBackendSuite
[error] 	org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithRocksDBBackendSuite
[error] 	org.apache.spark.deploy.yarn.YarnShuffleAuthWithRocksDBBackendSuite
[error] (yarn / Test / test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 662 s (11:02), completed Feb 1, 2024, 3:01:15 AM

@beliefer beliefer force-pushed the replace-timer-with-threadpool branch from 197c164 to 735df21 Compare February 4, 2024 01:58
@beliefer
Copy link
Contributor Author

beliefer commented Feb 4, 2024

Thank you for updating. Could you make CI happy?

I will try again.

@dongjoon-hyun
Copy link
Member

Thank you, @beliefer .

@beliefer beliefer force-pushed the replace-timer-with-threadpool branch from b1dc93c to db8c305 Compare February 5, 2024 03:29
@beliefer
Copy link
Contributor Author

beliefer commented Feb 5, 2024

@dongjoon-hyun All the tests passed.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK

@@ -283,6 +284,7 @@ object BarrierTaskContext {
@Since("2.4.0")
def get(): BarrierTaskContext = TaskContext.get().asInstanceOf[BarrierTaskContext]

private val timer = new Timer("Barrier task timer for barrier() calls.")
private val timer = ThreadUtils.newSingleThreadScheduledExecutor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I guess there is no place to shut this down. It's not critical and not something we have to change here. I wonder if it should be a daemon thread though? seems weird to spawn a thread that nothing in the code can terminate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The origin Timer is not a daemon thread, so I followed the same behavior.
There are some code used to cancel the timer task.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't shut down the executor though, and the other code paths do. I don't see how the non-daemon thread ever terminates. It may not matter as the Spark context lives as long as the app. It'd be tidy to shut it down, or make it a daemon anyway, but not strictly related to the change you're making

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Let's change it to daemon thread.

@LuciferYang
Copy link
Contributor

All test passed. Merged into master for Spark 4.0. Thanks @beliefer @srowen and @dongjoon-hyun ~

@beliefer
Copy link
Contributor Author

beliefer commented Feb 6, 2024

@LuciferYang @srowen @dongjoon-hyun Thank you!

jshmchenxi pushed a commit to jshmchenxi/spark that referenced this pull request Aug 31, 2024
### What changes were proposed in this pull request?
This PR propose to replace `Timer` with single thread scheduled executor.

### Why are the changes needed?
The javadoc recommends `ScheduledThreadPoolExecutor` instead of `Timer`.
![屏幕快照 2024-01-12 下午12 47 57](https://github.com/apache/spark/assets/8486025/4fc5ed61-6bb9-4768-915a-ad919a067d04)

This change based on the following two points.
**System time sensitivity**

Timer scheduling is based on the absolute time of the operating system and is sensitive to the operating system's time. Once the operating system's time changes, Timer scheduling is no longer precise.
The scheduled Thread Pool Executor scheduling is based on relative time and is not affected by changes in operating system time.

**Are anomalies captured**

Timer does not capture exceptions thrown by Timer Tasks, and in addition, Timer is single threaded. Once a scheduling task encounters an exception, the entire thread will terminate and other tasks that need to be scheduled will no longer be executed.
The scheduled Thread Pool Executor implements scheduling functions based on a thread pool. After a task throws an exception, other tasks can still execute normally.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
GA tests.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes apache#44718 from beliefer/replace-timer-with-threadpool.

Authored-by: beliefer <beliefer@163.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
(cherry picked from commit 5d5b3a5)
jshmchenxi pushed a commit to jshmchenxi/spark that referenced this pull request Sep 2, 2024
### What changes were proposed in this pull request?
This PR propose to replace `Timer` with single thread scheduled executor.

### Why are the changes needed?
The javadoc recommends `ScheduledThreadPoolExecutor` instead of `Timer`.
![屏幕快照 2024-01-12 下午12 47 57](https://github.com/apache/spark/assets/8486025/4fc5ed61-6bb9-4768-915a-ad919a067d04)

This change based on the following two points.
**System time sensitivity**

Timer scheduling is based on the absolute time of the operating system and is sensitive to the operating system's time. Once the operating system's time changes, Timer scheduling is no longer precise.
The scheduled Thread Pool Executor scheduling is based on relative time and is not affected by changes in operating system time.

**Are anomalies captured**

Timer does not capture exceptions thrown by Timer Tasks, and in addition, Timer is single threaded. Once a scheduling task encounters an exception, the entire thread will terminate and other tasks that need to be scheduled will no longer be executed.
The scheduled Thread Pool Executor implements scheduling functions based on a thread pool. After a task throws an exception, other tasks can still execute normally.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
GA tests.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes apache#44718 from beliefer/replace-timer-with-threadpool.

Authored-by: beliefer <beliefer@163.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
(cherry picked from commit 5d5b3a5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants