Skip to content

Commit d66f8d7

Browse files
committed
Address TD's comments
1 parent cbcd6e8 commit d66f8d7

File tree

1 file changed

+5
-1
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state

1 file changed

+5
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ object StateStore extends Logging {
128128
@GuardedBy("loadedProviders")
129129
private val loadedProviders = new mutable.HashMap[StateStoreId, StateStoreProvider]()
130130

131+
/**
132+
* Runs the `task` periodically and automatically cancels it if there is an exception. `onError`
133+
* will be called when an exception happens.
134+
*/
131135
class MaintenanceTask(periodMs: Long, task: => Unit, onError: => Unit) {
132136
private val executor =
133137
ThreadUtils.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task")
@@ -210,7 +214,7 @@ object StateStore extends Logging {
210214
/** Start the periodic maintenance task if not already started and if Spark active */
211215
private def startMaintenanceIfNeeded(): Unit = loadedProviders.synchronized {
212216
val env = SparkEnv.get
213-
if (env != null && (maintenanceTask == null || !maintenanceTask.isRunning)) {
217+
if (env != null && !isMaintenanceRunning) {
214218
val periodMs = env.conf.getTimeAsMs(
215219
MAINTENANCE_INTERVAL_CONFIG, s"${MAINTENANCE_INTERVAL_DEFAULT_SECS}s")
216220
maintenanceTask = new MaintenanceTask(

0 commit comments

Comments
 (0)