@@ -125,6 +125,7 @@ object StateStore extends Logging {
125
125
val MAINTENANCE_INTERVAL_CONFIG = " spark.sql.streaming.stateStore.maintenanceInterval"
126
126
val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60
127
127
128
+ @ GuardedBy (" loadedProviders" )
128
129
private val loadedProviders = new mutable.HashMap [StateStoreId , StateStoreProvider ]()
129
130
130
131
class MaintenanceTask (periodMs : Long , task : => Unit , onError : => Unit ) {
@@ -138,25 +139,28 @@ object StateStore extends Logging {
138
139
} catch {
139
140
case NonFatal (e) =>
140
141
logWarning(" Error running maintenance thread" , e)
141
- future.cancel(false ) // do interrupt active run, as this is being called from the run
142
142
onError
143
+ throw e
143
144
}
144
145
}
145
146
}
146
147
147
148
private val future : ScheduledFuture [_] = executor.scheduleAtFixedRate(
148
149
runnable, periodMs, periodMs, TimeUnit .MILLISECONDS )
149
150
150
- def stop (): Unit = { future.cancel(false ) }
151
+ def stop (): Unit = {
152
+ future.cancel(false )
153
+ executor.shutdown()
154
+ }
151
155
152
- def isRunning : Boolean = ! future.isCancelled
156
+ def isRunning : Boolean = ! future.isDone
153
157
}
154
158
155
159
@ GuardedBy (" loadedProviders" )
156
- @ volatile private var maintenanceTask : MaintenanceTask = null
160
+ private var maintenanceTask : MaintenanceTask = null
157
161
158
162
@ GuardedBy (" loadedProviders" )
159
- @ volatile private var _coordRef : StateStoreCoordinatorRef = null
163
+ private var _coordRef : StateStoreCoordinatorRef = null
160
164
161
165
/** Get or create a store associated with the id. */
162
166
def get (
@@ -238,6 +242,7 @@ object StateStore extends Logging {
238
242
} catch {
239
243
case NonFatal (e) =>
240
244
logWarning(s " Error managing $provider, stopping management thread " )
245
+ throw e
241
246
}
242
247
}
243
248
}
@@ -263,7 +268,7 @@ object StateStore extends Logging {
263
268
}
264
269
}
265
270
266
- private def coordinatorRef : Option [StateStoreCoordinatorRef ] = synchronized {
271
+ private def coordinatorRef : Option [StateStoreCoordinatorRef ] = loadedProviders. synchronized {
267
272
val env = SparkEnv .get
268
273
if (env != null ) {
269
274
if (_coordRef == null ) {
0 commit comments