-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-46895][CORE] Replace Timer with single thread scheduled executor #44718
Conversation
15f80b4
to
a1cade9
Compare
@dongjoon-hyun @srowen @LuciferYang Please help me to take a review. |
@@ -168,7 +170,7 @@ private[spark] class BarrierCoordinator( | |||
// we may timeout for the sync. | |||
if (requesters.isEmpty) { | |||
initTimerTask(this) | |||
timer.schedule(timerTask, timeoutInSecs * 1000) | |||
timer.schedule(timerTask, timeoutInSecs * 1000, TimeUnit.MILLISECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just remove "* 1000" and specify units as SECONDS
@@ -71,7 +72,7 @@ class BarrierTaskContext private[spark] ( | |||
} | |||
} | |||
// Log the update of global sync every 60 seconds. | |||
timer.schedule(timerTask, 60000, 60000) | |||
timer.scheduleAtFixedRate(timerTask, 60000, 60000, TimeUnit.MILLISECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same idea here, this is just 1 minute
7c6530a
to
db8f03c
Compare
starvationTimer.cancel() | ||
abortTimer.cancel() | ||
starvationTimer.shutdown() | ||
abortTimer.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine, but would it be more appropriate to use ThreadUtils.shutdown
? I am not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
@@ -128,7 +125,8 @@ private LauncherServer() throws IOException { | |||
this.threadIds = new AtomicLong(); | |||
this.factory = new NamedThreadFactory(THREAD_NAME_FMT); | |||
this.secretToPendingApps = new ConcurrentHashMap<>(); | |||
this.timeoutTimer = new Timer("LauncherServer-TimeoutTimer", true); | |||
this.timeoutTimer = new ScheduledThreadPoolExecutor( | |||
1, new NamedThreadFactory("LauncherServer-TimeoutTimer")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Indentation
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
import java.util.concurrent.ThreadFactory; | ||
import java.util.concurrent.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import java.util.concurrent.*; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.ScheduledThreadPoolExecutor; | |
import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.TimeUnit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to avoid using import *
8d4176b
to
562c725
Compare
The GA failure is unrelated. |
86060fd
to
fdbf3b6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine to me.
But I hope @srowen could approve before merge. Thanks ~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@beliefer . Although it may share the same target, this does look like a follow-up for the main PR about ConsoleProgressBar.scala
as of now. Given that the change affects many other places, please make a new JIRA ID.
@dongjoon-hyun new jira ID created. |
Thank you. Could you re-trigger the failed test pipelines? |
@dongjoon-hyun The GA failure is unrelated. |
9de4cb5
to
197c164
Compare
Thank you for updating. Could you make CI happy?
|
197c164
to
735df21
Compare
I will try again. |
Thank you, @beliefer . |
b1dc93c
to
db8c305
Compare
@dongjoon-hyun All the tests passed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's OK
@@ -283,6 +284,7 @@ object BarrierTaskContext { | |||
@Since("2.4.0") | |||
def get(): BarrierTaskContext = TaskContext.get().asInstanceOf[BarrierTaskContext] | |||
|
|||
private val timer = new Timer("Barrier task timer for barrier() calls.") | |||
private val timer = ThreadUtils.newSingleThreadScheduledExecutor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm I guess there is no place to shut this down. It's not critical and not something we have to change here. I wonder if it should be a daemon thread though? seems weird to spawn a thread that nothing in the code can terminate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The origin Timer is not a daemon thread, so I followed the same behavior.
There are some code used to cancel the timer task.
timerTask.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That doesn't shut down the executor though, and the other code paths do. I don't see how the non-daemon thread ever terminates. It may not matter as the Spark context lives as long as the app. It'd be tidy to shut it down, or make it a daemon anyway, but not strictly related to the change you're making
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Let's change it to daemon thread.
All test passed. Merged into master for Spark 4.0. Thanks @beliefer @srowen and @dongjoon-hyun ~ |
@LuciferYang @srowen @dongjoon-hyun Thank you! |
### What changes were proposed in this pull request? This PR propose to replace `Timer` with single thread scheduled executor. ### Why are the changes needed? The javadoc recommends `ScheduledThreadPoolExecutor` instead of `Timer`. ![屏幕快照 2024-01-12 下午12 47 57](https://github.com/apache/spark/assets/8486025/4fc5ed61-6bb9-4768-915a-ad919a067d04) This change based on the following two points. **System time sensitivity** Timer scheduling is based on the absolute time of the operating system and is sensitive to the operating system's time. Once the operating system's time changes, Timer scheduling is no longer precise. The scheduled Thread Pool Executor scheduling is based on relative time and is not affected by changes in operating system time. **Are anomalies captured** Timer does not capture exceptions thrown by Timer Tasks, and in addition, Timer is single threaded. Once a scheduling task encounters an exception, the entire thread will terminate and other tasks that need to be scheduled will no longer be executed. The scheduled Thread Pool Executor implements scheduling functions based on a thread pool. After a task throws an exception, other tasks can still execute normally. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? GA tests. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes apache#44718 from beliefer/replace-timer-with-threadpool. Authored-by: beliefer <beliefer@163.com> Signed-off-by: yangjie01 <yangjie01@baidu.com> (cherry picked from commit 5d5b3a5)
### What changes were proposed in this pull request? This PR propose to replace `Timer` with single thread scheduled executor. ### Why are the changes needed? The javadoc recommends `ScheduledThreadPoolExecutor` instead of `Timer`. ![屏幕快照 2024-01-12 下午12 47 57](https://github.com/apache/spark/assets/8486025/4fc5ed61-6bb9-4768-915a-ad919a067d04) This change based on the following two points. **System time sensitivity** Timer scheduling is based on the absolute time of the operating system and is sensitive to the operating system's time. Once the operating system's time changes, Timer scheduling is no longer precise. The scheduled Thread Pool Executor scheduling is based on relative time and is not affected by changes in operating system time. **Are anomalies captured** Timer does not capture exceptions thrown by Timer Tasks, and in addition, Timer is single threaded. Once a scheduling task encounters an exception, the entire thread will terminate and other tasks that need to be scheduled will no longer be executed. The scheduled Thread Pool Executor implements scheduling functions based on a thread pool. After a task throws an exception, other tasks can still execute normally. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? GA tests. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes apache#44718 from beliefer/replace-timer-with-threadpool. Authored-by: beliefer <beliefer@163.com> Signed-off-by: yangjie01 <yangjie01@baidu.com> (cherry picked from commit 5d5b3a5)
What changes were proposed in this pull request?
This PR propose to replace
Timer
with single thread scheduled executor.Why are the changes needed?
The javadoc recommends
ScheduledThreadPoolExecutor
instead ofTimer
.This change based on the following two points.
System time sensitivity
Timer scheduling is based on the absolute time of the operating system and is sensitive to the operating system's time. Once the operating system's time changes, Timer scheduling is no longer precise.
The scheduled Thread Pool Executor scheduling is based on relative time and is not affected by changes in operating system time.
Are anomalies captured
Timer does not capture exceptions thrown by Timer Tasks, and in addition, Timer is single threaded. Once a scheduling task encounters an exception, the entire thread will terminate and other tasks that need to be scheduled will no longer be executed.
The scheduled Thread Pool Executor implements scheduling functions based on a thread pool. After a task throws an exception, other tasks can still execute normally.
Does this PR introduce any user-facing change?
'No'.
How was this patch tested?
GA tests.
Was this patch authored or co-authored using generative AI tooling?
'No'.