Skip to content

Commit

Permalink
[SPARK-11020][CORE] Wait for HDFS to leave safe mode before initializ…
Browse files Browse the repository at this point in the history
…ing HS.

Large HDFS clusters may take a while to leave safe mode when starting; this change
makes the HS wait for that before doing checks about its configuraton. This means
the HS won't stop right away if HDFS is in safe mode and the configuration is not
correct, but that should be a very uncommon situation.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#9043 from vanzin/SPARK-11020.
  • Loading branch information
Marcelo Vanzin committed Nov 1, 2015
1 parent 046e32e commit cf04fdf
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.collection.mutable
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.security.AccessControlException

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
Expand All @@ -52,6 +53,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

private val NOT_STARTED = "<Not Started>"

// Interval between safemode checks.
private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
"spark.history.fs.safemodeCheck.interval", "5s")

// Interval between each check for event log updates
private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")

Expand Down Expand Up @@ -107,9 +112,57 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

initialize()
// Conf option used for testing the initialization code.
val initThread = if (!conf.getBoolean("spark.history.testing.skipInitialize", false)) {
initialize(None)
} else {
null
}

private[history] def initialize(errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = {
if (!isFsInSafeMode()) {
startPolling()
return null
}

// Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait
// for the FS to leave safe mode before enabling polling. This allows the main history server
// UI to be shown (so that the user can see the HDFS status).
//
// The synchronization in the run() method is needed because of the tests; mockito can
// misbehave if the test is modifying the mocked methods while the thread is calling
// them.
val initThread = new Thread(new Runnable() {
override def run(): Unit = {
try {
clock.synchronized {
while (isFsInSafeMode()) {
logInfo("HDFS is still in safe mode. Waiting...")
val deadline = clock.getTimeMillis() +
TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
clock.waitTillTime(deadline)
}
}
startPolling()
} catch {
case _: InterruptedException =>
}
}
})
initThread.setDaemon(true)
initThread.setName(s"${getClass().getSimpleName()}-init")
initThread.setUncaughtExceptionHandler(errorHandler.getOrElse(
new Thread.UncaughtExceptionHandler() {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
logError("Error initializing FsHistoryProvider.", e)
System.exit(1)
}
}))
initThread.start()
initThread
}

private def initialize(): Unit = {
private def startPolling(): Unit = {
// Validate the log directory.
val path = new Path(logDir)
if (!fs.exists(path)) {
Expand Down Expand Up @@ -170,7 +223,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

override def getConfig(): Map[String, String] = Map("Event log directory" -> logDir.toString)
override def getConfig(): Map[String, String] = {
val safeMode = if (isFsInSafeMode()) {
Map("HDFS State" -> "In safe mode, application logs not available.")
} else {
Map()
}
Map("Event log directory" -> logDir.toString) ++ safeMode
}

override def stop(): Unit = {
if (initThread != null && initThread.isAlive()) {
initThread.interrupt()
initThread.join()
}
}

/**
* Builds the application list based on the current contents of the log directory.
Expand Down Expand Up @@ -585,6 +652,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

/**
* Checks whether HDFS is in safe mode. The API is slightly different between hadoop 1 and 2,
* so we have to resort to ugly reflection (as usual...).
*
* Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons
* makes it more public than not.
*/
private[history] def isFsInSafeMode(): Boolean = fs match {
case dfs: DistributedFileSystem =>
isFsInSafeMode(dfs)
case _ =>
false
}

// For testing.
private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
val hadoop1Class = "org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction"
val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"
val actionClass: Class[_] =
try {
getClass().getClassLoader().loadClass(hadoop2Class)
} catch {
case _: ClassNotFoundException =>
getClass().getClassLoader().loadClass(hadoop1Class)
}

val action = actionClass.getField("SAFEMODE_GET").get(null)
val method = dfs.getClass().getMethod("setSafeMode", action.getClass())
method.invoke(dfs, action).asInstanceOf[Boolean]
}

}

private[history] object FsHistoryProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ import java.util.concurrent.TimeUnit
import java.util.zip.{ZipInputStream, ZipOutputStream}

import scala.io.Source
import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.base.Charsets
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.json4s.jackson.JsonMethods._
import org.mockito.Matchers.any
import org.mockito.Mockito.{doReturn, mock, spy, verify, when}
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.io._
Expand Down Expand Up @@ -407,6 +413,65 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}

test("provider correctly checks whether fs is in safe mode") {
val provider = spy(new FsHistoryProvider(createTestConf()))
val dfs = mock(classOf[DistributedFileSystem])
// Asserts that safe mode is false because we can't really control the return value of the mock,
// since the API is different between hadoop 1 and 2.
assert(!provider.isFsInSafeMode(dfs))
}

test("provider waits for safe mode to finish before initializing") {
val clock = new ManualClock()
val conf = createTestConf().set("spark.history.testing.skipInitialize", "true")
val provider = spy(new FsHistoryProvider(conf, clock))
doReturn(true).when(provider).isFsInSafeMode()

val initThread = provider.initialize(None)
try {
provider.getConfig().keys should contain ("HDFS State")

clock.setTime(5000)
provider.getConfig().keys should contain ("HDFS State")

// Synchronization needed because of mockito.
clock.synchronized {
doReturn(false).when(provider).isFsInSafeMode()
clock.setTime(10000)
}

eventually(timeout(1 second), interval(10 millis)) {
provider.getConfig().keys should not contain ("HDFS State")
}
} finally {
provider.stop()
}
}

test("provider reports error after FS leaves safe mode") {
testDir.delete()
val clock = new ManualClock()
val conf = createTestConf().set("spark.history.testing.skipInitialize", "true")
val provider = spy(new FsHistoryProvider(conf, clock))
doReturn(true).when(provider).isFsInSafeMode()

val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler])
val initThread = provider.initialize(Some(errorHandler))
try {
// Synchronization needed because of mockito.
clock.synchronized {
doReturn(false).when(provider).isFsInSafeMode()
clock.setTime(10000)
}

eventually(timeout(1 second), interval(10 millis)) {
verify(errorHandler).uncaughtException(any(), any())
}
} finally {
provider.stop()
}
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down

0 comments on commit cf04fdf

Please sign in to comment.