Skip to content

Commit 407c3ce

Browse files
tdaszsxwing
authored andcommitted
[SPARK-17624][SQL][STREAMING][TEST] Fixed flaky StateStoreSuite.maintenance
## What changes were proposed in this pull request? The reason for the flakiness was follows. The test starts the maintenance background thread, and then writes 20 versions of the state store. The maintenance thread is expected to create snapshots in the middle, and clean up old files that are not needed any more. The earliest delta file (1.delta) is expected to be deleted as snapshots will ensure that the earliest delta would not be needed. However, the default configuration for the maintenance thread is to retain files such that last 2 versions can be recovered, and delete the rest. Now while generating the versions, the maintenance thread can kick in and create snapshots anywhere between version 10 and 20 (at least 10 deltas needed for snapshot). Then later it will choose to retain only version 20 and 19 (last 2). There are two cases. - Common case: One of the version between 10 and 19 gets snapshotted. Then recovering versions 19 and 20 just needs 19.snapshot and 20.delta, so 1.delta gets deleted. - Uncommon case (reason for flakiness): Only version 20 gets snapshotted. Then recovering versoin 20 requires 20.snapshot, and recovering version 19 all the previous 19...1.delta. So 1.delta does not get deleted. This PR rearranges the checks such that it create 20 versions, and then waits that there is at least one snapshot, then creates another 20. This will ensure that the latest 2 versions cannot require anything older than the first snapshot generated, and therefore will 1.delta will be deleted. In addition, I have added more logs, and comments that I felt would help future debugging and understanding what is going on. ## How was this patch tested? Ran the StateStoreSuite > 6K times in a heavily loaded machine (10 instances of tests running in parallel). No failures. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15592 from tdas/SPARK-17624.
1 parent 81d6933 commit 407c3ce

File tree

3 files changed

+57
-28
lines changed

3 files changed

+57
-28
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ private[state] class HDFSBackedStateStoreProvider(
159159
} catch {
160160
case NonFatal(e) =>
161161
throw new IllegalStateException(
162-
s"Error committing version $newVersion into ${HDFSBackedStateStoreProvider.this}", e)
162+
s"Error committing version $newVersion into $this", e)
163163
}
164164
}
165165

@@ -205,6 +205,10 @@ private[state] class HDFSBackedStateStoreProvider(
205205
override private[state] def hasCommitted: Boolean = {
206206
state == COMMITTED
207207
}
208+
209+
override def toString(): String = {
210+
s"HDFSStateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
211+
}
208212
}
209213

210214
/** Get the state store for making updates to create a new `version` of the store. */
@@ -215,7 +219,7 @@ private[state] class HDFSBackedStateStoreProvider(
215219
newMap.putAll(loadMap(version))
216220
}
217221
val store = new HDFSBackedStateStore(version, newMap)
218-
logInfo(s"Retrieved version $version of $this for update")
222+
logInfo(s"Retrieved version $version of ${HDFSBackedStateStoreProvider.this} for update")
219223
store
220224
}
221225

@@ -231,7 +235,7 @@ private[state] class HDFSBackedStateStoreProvider(
231235
}
232236

233237
override def toString(): String = {
234-
s"StateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
238+
s"HDFSStateStoreProvider[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
235239
}
236240

237241
/* Internal classes and methods */
@@ -493,10 +497,12 @@ private[state] class HDFSBackedStateStoreProvider(
493497
val mapsToRemove = loadedMaps.keys.filter(_ < earliestVersionToRetain).toSeq
494498
mapsToRemove.foreach(loadedMaps.remove)
495499
}
496-
files.filter(_.version < earliestFileToRetain.version).foreach { f =>
500+
val filesToDelete = files.filter(_.version < earliestFileToRetain.version)
501+
filesToDelete.foreach { f =>
497502
fs.delete(f.path, true)
498503
}
499-
logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this")
504+
logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this: " +
505+
filesToDelete.mkString(", "))
500506
}
501507
}
502508
} catch {
@@ -560,7 +566,7 @@ private[state] class HDFSBackedStateStoreProvider(
560566
}
561567
}
562568
val storeFiles = versionToFiles.values.toSeq.sortBy(_.version)
563-
logDebug(s"Current set of files for $this: $storeFiles")
569+
logDebug(s"Current set of files for $this: ${storeFiles.mkString(", ")}")
564570
storeFiles
565571
}
566572

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ private case class VerifyIfInstanceActive(storeId: StateStoreId, executorId: Str
3838
private case class GetLocation(storeId: StateStoreId)
3939
extends StateStoreCoordinatorMessage
4040

41-
private case class DeactivateInstances(storeRootLocation: String)
41+
private case class DeactivateInstances(checkpointLocation: String)
4242
extends StateStoreCoordinatorMessage
4343

4444
private object StopCoordinator
@@ -111,11 +111,13 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) {
111111
* Class for coordinating instances of [[StateStore]]s loaded in executors across the cluster,
112112
* and get their locations for job scheduling.
113113
*/
114-
private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
114+
private class StateStoreCoordinator(override val rpcEnv: RpcEnv)
115+
extends ThreadSafeRpcEndpoint with Logging {
115116
private val instances = new mutable.HashMap[StateStoreId, ExecutorCacheTaskLocation]
116117

117118
override def receive: PartialFunction[Any, Unit] = {
118119
case ReportActiveInstance(id, host, executorId) =>
120+
logDebug(s"Reported state store $id is active at $executorId")
119121
instances.put(id, ExecutorCacheTaskLocation(host, executorId))
120122
}
121123

@@ -125,19 +127,25 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadS
125127
case Some(location) => location.executorId == execId
126128
case None => false
127129
}
130+
logDebug(s"Verified that state store $id is active: $response")
128131
context.reply(response)
129132

130133
case GetLocation(id) =>
131-
context.reply(instances.get(id).map(_.toString))
134+
val executorId = instances.get(id).map(_.toString)
135+
logDebug(s"Got location of the state store $id: $executorId")
136+
context.reply(executorId)
132137

133-
case DeactivateInstances(loc) =>
138+
case DeactivateInstances(checkpointLocation) =>
134139
val storeIdsToRemove =
135-
instances.keys.filter(_.checkpointLocation == loc).toSeq
140+
instances.keys.filter(_.checkpointLocation == checkpointLocation).toSeq
136141
instances --= storeIdsToRemove
142+
logDebug(s"Deactivating instances related to checkpoint location $checkpointLocation: " +
143+
storeIdsToRemove.mkString(", "))
137144
context.reply(true)
138145

139146
case StopCoordinator =>
140147
stop() // Stop before replying to ensure that endpoint name has been deregistered
148+
logInfo("StateStoreCoordinator stopped")
141149
context.reply(true)
142150
}
143151
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,10 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
367367
val conf = new SparkConf()
368368
.setMaster("local")
369369
.setAppName("test")
370+
// Make maintenance thread do snapshots and cleanups very fast
370371
.set(StateStore.MAINTENANCE_INTERVAL_CONFIG, "10ms")
372+
// Make sure that when SparkContext stops, the StateStore maintenance thread 'quickly'
373+
// fails to talk to the StateStoreCoordinator and unloads all the StateStores
371374
.set("spark.rpc.numRetries", "1")
372375
val opId = 0
373376
val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
@@ -377,37 +380,49 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
377380
val provider = new HDFSBackedStateStoreProvider(
378381
storeId, keySchema, valueSchema, storeConf, hadoopConf)
379382

383+
var latestStoreVersion = 0
384+
385+
def generateStoreVersions() {
386+
for (i <- 1 to 20) {
387+
val store = StateStore.get(
388+
storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf)
389+
put(store, "a", i)
390+
store.commit()
391+
latestStoreVersion += 1
392+
}
393+
}
380394

381395
quietly {
382396
withSpark(new SparkContext(conf)) { sc =>
383397
withCoordinatorRef(sc) { coordinatorRef =>
384398
require(!StateStore.isMaintenanceRunning, "StateStore is unexpectedly running")
385399

386-
for (i <- 1 to 20) {
387-
val store = StateStore.get(
388-
storeId, keySchema, valueSchema, i - 1, storeConf, hadoopConf)
389-
put(store, "a", i)
390-
store.commit()
391-
}
400+
// Generate sufficient versions of store for snapshots
401+
generateStoreVersions()
392402

393403
eventually(timeout(10 seconds)) {
404+
// Store should have been reported to the coordinator
394405
assert(coordinatorRef.getLocation(storeId).nonEmpty, "active instance was not reported")
395-
}
396406

397-
// Background maintenance should clean up and generate snapshots
398-
assert(StateStore.isMaintenanceRunning, "Maintenance task is not running")
399-
400-
eventually(timeout(10 seconds)) {
401-
// Earliest delta file should get cleaned up
402-
assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted")
407+
// Background maintenance should clean up and generate snapshots
408+
assert(StateStore.isMaintenanceRunning, "Maintenance task is not running")
403409

404410
// Some snapshots should have been generated
405-
val snapshotVersions = (0 to 20).filter { version =>
411+
val snapshotVersions = (1 to latestStoreVersion).filter { version =>
406412
fileExists(provider, version, isSnapshot = true)
407413
}
408414
assert(snapshotVersions.nonEmpty, "no snapshot file found")
409415
}
410416

417+
// Generate more versions such that there is another snapshot and
418+
// the earliest delta file will be cleaned up
419+
generateStoreVersions()
420+
421+
// Earliest delta file should get cleaned up
422+
eventually(timeout(10 seconds)) {
423+
assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted")
424+
}
425+
411426
// If driver decides to deactivate all instances of the store, then this instance
412427
// should be unloaded
413428
coordinatorRef.deactivateInstances(dir)
@@ -416,7 +431,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
416431
}
417432

418433
// Reload the store and verify
419-
StateStore.get(storeId, keySchema, valueSchema, 20, storeConf, hadoopConf)
434+
StateStore.get(storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf)
420435
assert(StateStore.isLoaded(storeId))
421436

422437
// If some other executor loads the store, then this instance should be unloaded
@@ -426,14 +441,14 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
426441
}
427442

428443
// Reload the store and verify
429-
StateStore.get(storeId, keySchema, valueSchema, 20, storeConf, hadoopConf)
444+
StateStore.get(storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf)
430445
assert(StateStore.isLoaded(storeId))
431446
}
432447
}
433448

434449
// Verify if instance is unloaded if SparkContext is stopped
435-
require(SparkEnv.get === null)
436450
eventually(timeout(10 seconds)) {
451+
require(SparkEnv.get === null)
437452
assert(!StateStore.isLoaded(storeId))
438453
assert(!StateStore.isMaintenanceRunning)
439454
}

0 commit comments

Comments
 (0)