Skip to content

[SPARK-17624][SQL][STREAMING][TEST] Fixed flaky StateStoreSuite.maintenance #15592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private[state] class HDFSBackedStateStoreProvider(
} catch {
case NonFatal(e) =>
throw new IllegalStateException(
s"Error committing version $newVersion into ${HDFSBackedStateStoreProvider.this}", e)
s"Error committing version $newVersion into $this", e)
}
}

Expand Down Expand Up @@ -205,6 +205,10 @@ private[state] class HDFSBackedStateStoreProvider(
override private[state] def hasCommitted: Boolean = {
state == COMMITTED
}

override def toString(): String = {
s"HDFSStateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
}
}

/** Get the state store for making updates to create a new `version` of the store. */
Expand All @@ -215,7 +219,7 @@ private[state] class HDFSBackedStateStoreProvider(
newMap.putAll(loadMap(version))
}
val store = new HDFSBackedStateStore(version, newMap)
logInfo(s"Retrieved version $version of $this for update")
logInfo(s"Retrieved version $version of ${HDFSBackedStateStoreProvider.this} for update")
store
}

Expand All @@ -231,7 +235,7 @@ private[state] class HDFSBackedStateStoreProvider(
}

override def toString(): String = {
s"StateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
s"HDFSStateStoreProvider[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
}

/* Internal classes and methods */
Expand Down Expand Up @@ -493,10 +497,12 @@ private[state] class HDFSBackedStateStoreProvider(
val mapsToRemove = loadedMaps.keys.filter(_ < earliestVersionToRetain).toSeq
mapsToRemove.foreach(loadedMaps.remove)
}
files.filter(_.version < earliestFileToRetain.version).foreach { f =>
val filesToDelete = files.filter(_.version < earliestFileToRetain.version)
filesToDelete.foreach { f =>
fs.delete(f.path, true)
}
logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this")
logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this: " +
filesToDelete.mkString(", "))
}
}
} catch {
Expand Down Expand Up @@ -560,7 +566,7 @@ private[state] class HDFSBackedStateStoreProvider(
}
}
val storeFiles = versionToFiles.values.toSeq.sortBy(_.version)
logDebug(s"Current set of files for $this: $storeFiles")
logDebug(s"Current set of files for $this: ${storeFiles.mkString(", ")}")
storeFiles
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private case class VerifyIfInstanceActive(storeId: StateStoreId, executorId: Str
private case class GetLocation(storeId: StateStoreId)
extends StateStoreCoordinatorMessage

private case class DeactivateInstances(storeRootLocation: String)
private case class DeactivateInstances(checkpointLocation: String)
extends StateStoreCoordinatorMessage

private object StopCoordinator
Expand Down Expand Up @@ -111,11 +111,13 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) {
* Class for coordinating instances of [[StateStore]]s loaded in executors across the cluster,
* and get their locations for job scheduling.
*/
private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
private class StateStoreCoordinator(override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEndpoint with Logging {
private val instances = new mutable.HashMap[StateStoreId, ExecutorCacheTaskLocation]

override def receive: PartialFunction[Any, Unit] = {
case ReportActiveInstance(id, host, executorId) =>
logDebug(s"Reported state store $id is active at $executorId")
instances.put(id, ExecutorCacheTaskLocation(host, executorId))
}

Expand All @@ -125,19 +127,25 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadS
case Some(location) => location.executorId == execId
case None => false
}
logDebug(s"Verified that state store $id is active: $response")
context.reply(response)

case GetLocation(id) =>
context.reply(instances.get(id).map(_.toString))
val executorId = instances.get(id).map(_.toString)
logDebug(s"Got location of the state store $id: $executorId")
context.reply(executorId)

case DeactivateInstances(loc) =>
case DeactivateInstances(checkpointLocation) =>
val storeIdsToRemove =
instances.keys.filter(_.checkpointLocation == loc).toSeq
instances.keys.filter(_.checkpointLocation == checkpointLocation).toSeq
instances --= storeIdsToRemove
logDebug(s"Deactivating instances related to checkpoint location $checkpointLocation: " +
storeIdsToRemove.mkString(", "))
context.reply(true)

case StopCoordinator =>
stop() // Stop before replying to ensure that endpoint name has been deregistered
logInfo("StateStoreCoordinator stopped")
context.reply(true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,10 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
// Make maintenance thread do snapshots and cleanups very fast
.set(StateStore.MAINTENANCE_INTERVAL_CONFIG, "10ms")
// Make sure that when SparkContext stops, the StateStore maintenance thread 'quickly'
// fails to talk to the StateStoreCoordinator and unloads all the StateStores
.set("spark.rpc.numRetries", "1")
val opId = 0
val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
Expand All @@ -377,37 +380,49 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
val provider = new HDFSBackedStateStoreProvider(
storeId, keySchema, valueSchema, storeConf, hadoopConf)

var latestStoreVersion = 0

def generateStoreVersions() {
for (i <- 1 to 20) {
val store = StateStore.get(
storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf)
put(store, "a", i)
store.commit()
latestStoreVersion += 1
}
}

quietly {
withSpark(new SparkContext(conf)) { sc =>
withCoordinatorRef(sc) { coordinatorRef =>
require(!StateStore.isMaintenanceRunning, "StateStore is unexpectedly running")

for (i <- 1 to 20) {
val store = StateStore.get(
storeId, keySchema, valueSchema, i - 1, storeConf, hadoopConf)
put(store, "a", i)
store.commit()
}
// Generate sufficient versions of store for snapshots
generateStoreVersions()

eventually(timeout(10 seconds)) {
// Store should have been reported to the coordinator
assert(coordinatorRef.getLocation(storeId).nonEmpty, "active instance was not reported")
}

// Background maintenance should clean up and generate snapshots
assert(StateStore.isMaintenanceRunning, "Maintenance task is not running")

eventually(timeout(10 seconds)) {
// Earliest delta file should get cleaned up
assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted")
// Background maintenance should clean up and generate snapshots
assert(StateStore.isMaintenanceRunning, "Maintenance task is not running")

// Some snapshots should have been generated
val snapshotVersions = (0 to 20).filter { version =>
val snapshotVersions = (1 to latestStoreVersion).filter { version =>
fileExists(provider, version, isSnapshot = true)
}
assert(snapshotVersions.nonEmpty, "no snapshot file found")
}

// Generate more versions such that there is another snapshot and
// the earliest delta file will be cleaned up
generateStoreVersions()

// Earliest delta file should get cleaned up
eventually(timeout(10 seconds)) {
assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted")
}

// If driver decides to deactivate all instances of the store, then this instance
// should be unloaded
coordinatorRef.deactivateInstances(dir)
Expand All @@ -416,7 +431,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
}

// Reload the store and verify
StateStore.get(storeId, keySchema, valueSchema, 20, storeConf, hadoopConf)
StateStore.get(storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf)
assert(StateStore.isLoaded(storeId))

// If some other executor loads the store, then this instance should be unloaded
Expand All @@ -426,14 +441,14 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
}

// Reload the store and verify
StateStore.get(storeId, keySchema, valueSchema, 20, storeConf, hadoopConf)
StateStore.get(storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf)
assert(StateStore.isLoaded(storeId))
}
}

// Verify if instance is unloaded if SparkContext is stopped
require(SparkEnv.get === null)
eventually(timeout(10 seconds)) {
require(SparkEnv.get === null)
assert(!StateStore.isLoaded(storeId))
assert(!StateStore.isMaintenanceRunning)
}
Expand Down