Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

import java.util.{Timer, TimerTask}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{Consumer, Function}
import java.util.function.Consumer

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -202,10 +202,8 @@ private[spark] class BarrierCoordinator(
case request @ RequestToSync(numTasks, stageId, stageAttemptId, _, _) =>
// Get or init the ContextBarrierState correspond to the stage attempt.
val barrierId = ContextBarrierId(stageId, stageAttemptId)
states.computeIfAbsent(barrierId, new Function[ContextBarrierId, ContextBarrierState] {
override def apply(key: ContextBarrierId): ContextBarrierState =
new ContextBarrierState(key, numTasks)
})
states.computeIfAbsent(barrierId,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

90% of the changes are like this, converting an anonymous inner class to a SAM expression.

(key: ContextBarrierId) => new ContextBarrierState(key, numTasks))
val barrierState = states.get(barrierId)

barrierState.handleRequest(context, request)
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
cleaningThread.setDaemon(true)
cleaningThread.setName("Spark Context Cleaner")
cleaningThread.start()
periodicGCService.scheduleAtFixedRate(new Runnable {
override def run(): Unit = System.gc()
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
periodicGCService.scheduleAtFixedRate(() => System.gc(),
periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}

/**
Expand Down
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")

override def onStart(): Unit = {
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.ask[Boolean](ExpireDeadHosts))
}
}, 0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { Option(self).foreach(_.ask[Boolean](ExpireDeadHosts)) },
0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria

@transient private lazy val reader: ConfigReader = {
val _reader = new ConfigReader(new SparkConfigProvider(settings))
_reader.bindEnv(new ConfigProvider {
override def get(key: String): Option[String] = Option(getenv(key))
})
_reader.bindEnv((key: String) => Option(getenv(key)))
_reader
}

Expand Down Expand Up @@ -392,7 +390,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria

/** Get an optional value, applying variable substitution. */
private[spark] def getWithSubstitution(key: String): Option[String] = {
getOption(key).map(reader.substitute(_))
getOption(key).map(reader.substitute)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the files I did change, I cleaned up a few other things in nearby code, like this. Other examples are using .nonEmpty and removing redundant braces, etc

}

/** Get all parameters as a list of pairs */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@ object PythonRunner {
.javaAddress(localhost)
.callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
.build()
val thread = new Thread(new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions {
gatewayServer.start()
}
})
val thread = new Thread(() => Utils.logUncaughtExceptions { gatewayServer.start() })
thread.setName("py4j-gateway-init")
thread.setDaemon(true)
thread.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.deploy
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
import java.util.{Arrays, Date, Locale}

import scala.collection.JavaConverters._
import scala.collection.immutable.Map
Expand Down Expand Up @@ -270,11 +270,8 @@ private[spark] class SparkHadoopUtil extends Logging {
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
}
})
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
override def compare(o1: FileStatus, o2: FileStatus): Int = {
Longs.compare(o1.getModificationTime, o2.getModificationTime)
}
})
Arrays.sort(fileStatuses, (o1: FileStatus, o2: FileStatus) =>
Longs.compare(o1.getModificationTime, o2.getModificationTime))
fileStatuses
} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -465,7 +462,7 @@ private[spark] object SparkHadoopUtil {
// scalastyle:on line.size.limit
def createNonECFile(fs: FileSystem, path: Path): FSDataOutputStream = {
try {
// Use reflection as this uses apis only avialable in hadoop 3
// Use reflection as this uses APIs only available in Hadoop 3
val builderMethod = fs.getClass().getMethod("createFile", classOf[Path])
// the builder api does not resolve relative paths, nor does it create parent dirs, while
// the old api does.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
*/
private def getRunner(operateFun: () => Unit): Runnable = {
new Runnable() {
override def run(): Unit = Utils.tryOrExit {
operateFun()
}
}
}
private def getRunner(operateFun: () => Unit): Runnable =
() => Utils.tryOrExit { operateFun() }

/**
* Fixed size thread pool to fetch and parse log files.
Expand Down Expand Up @@ -221,29 +216,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait
// for the FS to leave safe mode before enabling polling. This allows the main history server
// UI to be shown (so that the user can see the HDFS status).
val initThread = new Thread(new Runnable() {
override def run(): Unit = {
try {
while (isFsInSafeMode()) {
logInfo("HDFS is still in safe mode. Waiting...")
val deadline = clock.getTimeMillis() +
TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
clock.waitTillTime(deadline)
}
startPolling()
} catch {
case _: InterruptedException =>
val initThread = new Thread(() => {
try {
while (isFsInSafeMode()) {
logInfo("HDFS is still in safe mode. Waiting...")
val deadline = clock.getTimeMillis() +
TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
clock.waitTillTime(deadline)
}
startPolling()
} catch {
case _: InterruptedException =>
}
})
initThread.setDaemon(true)
initThread.setName(s"${getClass().getSimpleName()}-init")
initThread.setUncaughtExceptionHandler(errorHandler.getOrElse(
new Thread.UncaughtExceptionHandler() {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
logError("Error initializing FsHistoryProvider.", e)
System.exit(1)
}
(_: Thread, e: Throwable) => {
logError("Error initializing FsHistoryProvider.", e)
System.exit(1)
}))
initThread.start()
initThread
Expand Down Expand Up @@ -517,9 +508,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

val tasks = updated.flatMap { entry =>
try {
val task: Future[Unit] = replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(entry, newLastScanTime, true)
}, Unit)
val task: Future[Unit] = replayExecutor.submit(
() => mergeApplicationListing(entry, newLastScanTime, true))
Some(task -> entry.getPath)
} catch {
// let the iteration over the updated entries break, since an exception on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,9 @@ private[deploy] class Master(
logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
s"Applications UIs are available at $masterWebUiUrl")
}
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, workerTimeoutMs, TimeUnit.MILLISECONDS)
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) },
0, workerTimeoutMs, TimeUnit.MILLISECONDS)

if (restServerEnabled) {
val port = conf.get(MASTER_REST_SERVER_PORT)
Expand Down
38 changes: 14 additions & 24 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,9 @@ private[deploy] class Worker(
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
registrationRetryTimer.foreach(_.cancel(true))
registrationRetryTimer = Some(
forwardMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(ReregisterWithMaster)
}
}, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(ReregisterWithMaster) },
PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
}
Expand All @@ -341,7 +339,7 @@ private[deploy] class Worker(
}

/**
* Cancel last registeration retry, or do nothing if no retry
* Cancel last registration retry, or do nothing if no retry
*/
private def cancelLastRegistrationRetry(): Unit = {
if (registerMasterFutures != null) {
Expand All @@ -361,11 +359,7 @@ private[deploy] class Worker(
registerMasterFutures = tryRegisterAllMasters()
connectionAttemptCount = 0
registrationRetryTimer = Some(forwardMessageScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReregisterWithMaster))
}
},
() => Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReregisterWithMaster)) },
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
Expand Down Expand Up @@ -407,19 +401,15 @@ private[deploy] class Worker(
}
registered = true
changeMaster(masterRef, masterWebUiUrl, masterAddress)
forwardMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(SendHeartbeat) },
0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
if (CLEANUP_ENABLED) {
logInfo(
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
forwardMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(WorkDirCleanup)
}
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) },
CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}

val execs = executors.values.map { e =>
Expand Down Expand Up @@ -568,7 +558,7 @@ private[deploy] class Worker(
}
}

case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
case executorStateChanged: ExecutorStateChanged =>
handleExecutorStateChanged(executorStateChanged)

case KillExecutor(masterUrl, appId, execId) =>
Expand Down Expand Up @@ -632,7 +622,7 @@ private[deploy] class Worker(

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (master.exists(_.address == remoteAddress) ||
masterAddressToConnect.exists(_ == remoteAddress)) {
masterAddressToConnect.contains(remoteAddress)) {
logInfo(s"$remoteAddress Disassociated !")
masterDisconnected()
}
Expand Down Expand Up @@ -815,7 +805,7 @@ private[deploy] object Worker extends Logging {
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
Expand Down
11 changes: 4 additions & 7 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,14 @@ private[spark] class Executor(
}

// Start worker thread pool
// Use UninterruptibleThread to run tasks so that we can allow running codes without being
// interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
// will hang forever if some methods are interrupted.
private val threadPool = {
val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Executor task launch worker-%d")
.setThreadFactory(new ThreadFactory {
override def newThread(r: Runnable): Thread =
// Use UninterruptibleThread to run tasks so that we can allow running codes without being
// interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
// will hang forever if some methods are interrupted.
new UninterruptibleThread(r, "unused") // thread name will be set by ThreadFactoryBuilder
})
.setThreadFactory((r: Runnable) => new UninterruptibleThread(r, "unused"))
.build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private[spark] abstract class LauncherBackend {
.map(_.toInt)
val secret = conf.getOption(LauncherProtocol.CONF_LAUNCHER_SECRET)
.orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET))
if (port != None && secret != None) {
if (port.isDefined && secret.isDefined) {
val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
connection = new BackendConnection(s)
connection.send(new Hello(secret.get, SPARK_VERSION))
Expand Down Expand Up @@ -94,11 +94,8 @@ private[spark] abstract class LauncherBackend {
protected def onDisconnected() : Unit = { }

private def fireStopRequest(): Unit = {
val thread = LauncherBackend.threadFactory.newThread(new Runnable() {
override def run(): Unit = Utils.tryLogNonFatalError {
onStopRequest()
}
})
val thread = LauncherBackend.threadFactory.newThread(
() => Utils.tryLogNonFatalError { onStopRequest() })
thread.start()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit

import scala.collection.mutable

import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
import com.codahale.metrics.{Metric, MetricRegistry}
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{SecurityManager, SparkConf}
Expand Down Expand Up @@ -168,9 +168,7 @@ private[spark] class MetricsSystem private (
def removeSource(source: Source) {
sources -= source
val regName = buildRegistryName(source)
registry.removeMatching(new MetricFilter {
def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
})
registry.removeMatching((name: String, _: Metric) => name.startsWith(regName))
}

private def registerSources() {
Expand Down
Loading