Skip to content

Commit 837cc96

Browse files
author
David Navas
committed
Merge pull request #127 from markhamstra/csd-1.5
SPY-875: backported SPARK-11863 and merged Apache branch-1.5
2 parents 6166f05 + 2c0c771 commit 837cc96

File tree

62 files changed

+766
-168
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+766
-168
lines changed

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark
1919

2020
import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
2121
import java.net.{URI, URL}
22+
import java.nio.file.Paths
2223
import java.util.jar.{JarEntry, JarOutputStream}
2324

2425
import scala.collection.JavaConversions._
@@ -78,15 +79,15 @@ private[spark] object TestUtils {
7879
}
7980

8081
/**
81-
* Create a jar file that contains this set of files. All files will be located at the root
82-
* of the jar.
82+
* Create a jar file that contains this set of files. All files will be located in the specified
83+
* directory or at the root of the jar.
8384
*/
84-
def createJar(files: Seq[File], jarFile: File): URL = {
85+
def createJar(files: Seq[File], jarFile: File, directoryPrefix: Option[String] = None): URL = {
8586
val jarFileStream = new FileOutputStream(jarFile)
8687
val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest())
8788

8889
for (file <- files) {
89-
val jarEntry = new JarEntry(file.getName)
90+
val jarEntry = new JarEntry(Paths.get(directoryPrefix.getOrElse(""), file.getName).toString)
9091
jarStream.putNextEntry(jarEntry)
9192

9293
val in = new FileInputStream(file)
@@ -118,7 +119,7 @@ private[spark] object TestUtils {
118119
classpathUrls: Seq[URL]): File = {
119120
val compiler = ToolProvider.getSystemJavaCompiler
120121

121-
// Calling this outputs a class file in pwd. It's easier to just rename the file than
122+
// Calling this outputs a class file in pwd. It's easier to just rename the files than
122123
// build a custom FileManager that controls the output location.
123124
val options = if (classpathUrls.nonEmpty) {
124125
Seq("-classpath", classpathUrls.map { _.getFile }.mkString(File.pathSeparator))

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
3939
import org.apache.ivy.plugins.repository.file.FileRepository
4040
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
4141

42-
import org.apache.spark.{SparkUserAppException, SPARK_VERSION}
42+
import org.apache.spark.{SparkException, SparkUserAppException, SPARK_VERSION}
4343
import org.apache.spark.api.r.RUtils
4444
import org.apache.spark.deploy.rest._
4545
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
@@ -521,8 +521,19 @@ object SparkSubmit {
521521
sysProps.put("spark.yarn.isPython", "true")
522522
}
523523
if (args.principal != null) {
524-
require(args.keytab != null, "Keytab must be specified when the keytab is specified")
525-
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
524+
require(args.keytab != null, "Keytab must be specified when principal is specified")
525+
if (!new File(args.keytab).exists()) {
526+
throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
527+
} else {
528+
// Add keytab and principal configurations in sysProps to make them available
529+
// for later use; e.g. in spark sql, the isolated class loader used to talk
530+
// to HiveMetastore will use these settings. They will be set as Java system
531+
// properties and then loaded by SparkConf
532+
sysProps.put("spark.yarn.keytab", args.keytab)
533+
sysProps.put("spark.yarn.principal", args.principal)
534+
535+
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
536+
}
526537
}
527538
}
528539

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,10 @@ private[spark] class AppClient(
6666
// A thread pool for registering with masters. Because registering with a master is a blocking
6767
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
6868
// time so that we can register with all masters.
69-
private val registerMasterThreadPool = new ThreadPoolExecutor(
70-
0,
71-
masterRpcAddresses.size, // Make sure we can register with all masters at the same time
72-
60L, TimeUnit.SECONDS,
73-
new SynchronousQueue[Runnable](),
74-
ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
69+
private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
70+
"appclient-register-master-threadpool",
71+
masterRpcAddresses.length // Make sure we can register with all masters at the same time
72+
)
7573

7674
// A scheduled executor for scheduling the registration actions
7775
private val registrationRetryThread =

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,7 @@ private[deploy] class Master(
925925
}
926926

927927
val eventLogFilePrefix = EventLoggingListener.getLogPath(
928-
eventLogDir, app.id, app.desc.eventLogCodec)
928+
eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
929929
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
930930
val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
931931
EventLoggingListener.IN_PROGRESS))

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,10 @@ private[deploy] class Worker(
147147
// A thread pool for registering with masters. Because registering with a master is a blocking
148148
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
149149
// time so that we can register with all masters.
150-
private val registerMasterThreadPool = new ThreadPoolExecutor(
151-
0,
152-
masterRpcAddresses.size, // Make sure we can register with all masters at the same time
153-
60L, TimeUnit.SECONDS,
154-
new SynchronousQueue[Runnable](),
155-
ThreadUtils.namedThreadFactory("worker-register-master-threadpool"))
150+
private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
151+
"worker-register-master-threadpool",
152+
masterRpcAddresses.size // Make sure we can register with all masters at the same time
153+
)
156154

157155
var coresUsed = 0
158156
var memoryUsed = 0

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ private[akka] class ErrorMonitor extends Actor with ActorLogReceive with Logging
267267
}
268268

269269
override def receiveWithLogging: Actor.Receive = {
270-
case Error(cause: Throwable, _, _, message: String) => logError(message, cause)
270+
case Error(cause: Throwable, _, _, message: String) => logDebug(message, cause)
271271
}
272272
}
273273

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,9 @@ class DAGScheduler(
818818
stage.resetInternalAccumulators()
819819
}
820820

821-
val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
821+
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
822+
// with this Stage
823+
val properties = jobIdToActiveJob(jobId).properties
822824

823825
runningStages += stage
824826
// SparkListenerStageSubmitted should be posted before testing whether tasks are
@@ -913,7 +915,7 @@ class DAGScheduler(
913915
stage.pendingPartitions ++= tasks.map(_.partitionId)
914916
logDebug("New pending partitions: " + stage.pendingPartitions)
915917
taskScheduler.submitTasks(new TaskSet(
916-
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
918+
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
917919
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
918920
} else {
919921
// Because we posted SparkListenerStageSubmitted earlier, we should mark

core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,16 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
103103
try {
104104
getTaskResultExecutor.execute(new Runnable {
105105
override def run(): Unit = Utils.logUncaughtExceptions {
106+
val loader = Utils.getContextOrSparkClassLoader
106107
try {
107108
if (serializedData != null && serializedData.limit() > 0) {
108109
reason = serializer.get().deserialize[TaskEndReason](
109-
serializedData, Utils.getSparkClassLoader)
110+
serializedData, loader)
110111
}
111112
} catch {
112113
case cnd: ClassNotFoundException =>
113114
// Log an error but keep going here -- the task failed, so not catastrophic
114115
// if we can't deserialize the reason.
115-
val loader = Utils.getContextOrSparkClassLoader
116116
logError(
117117
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
118118
case ex: Exception => {}

core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ private[spark] object ShutdownHookManager extends Logging {
5757
// Add a shutdown hook to delete the temp dirs when the JVM exits
5858
addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
5959
logInfo("Shutdown hook called")
60-
shutdownDeletePaths.foreach { dirPath =>
60+
// we need to materialize the paths to delete because deleteRecursively removes items from
61+
// shutdownDeletePaths as we are traversing through it.
62+
shutdownDeletePaths.toArray.foreach { dirPath =>
6163
try {
6264
logInfo("Deleting directory " + dirPath)
6365
Utils.deleteRecursively(new File(dirPath))
@@ -204,7 +206,7 @@ private[spark] object ShutdownHookManager extends Logging {
204206
private [util] class SparkShutdownHookManager {
205207

206208
private val hooks = new PriorityQueue[SparkShutdownHook]()
207-
private var shuttingDown = false
209+
@volatile private var shuttingDown = false
208210

209211
/**
210212
* Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
@@ -228,28 +230,27 @@ private [util] class SparkShutdownHookManager {
228230
}
229231
}
230232

231-
def runAll(): Unit = synchronized {
233+
def runAll(): Unit = {
232234
shuttingDown = true
233-
while (!hooks.isEmpty()) {
234-
Try(Utils.logUncaughtExceptions(hooks.poll().run()))
235+
var nextHook: SparkShutdownHook = null
236+
while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) {
237+
Try(Utils.logUncaughtExceptions(nextHook.run()))
235238
}
236239
}
237240

238-
def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
239-
checkState()
240-
val hookRef = new SparkShutdownHook(priority, hook)
241-
hooks.add(hookRef)
242-
hookRef
243-
}
244-
245-
def remove(ref: AnyRef): Boolean = synchronized {
246-
hooks.remove(ref)
241+
def add(priority: Int, hook: () => Unit): AnyRef = {
242+
hooks.synchronized {
243+
if (shuttingDown) {
244+
throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
245+
}
246+
val hookRef = new SparkShutdownHook(priority, hook)
247+
hooks.add(hookRef)
248+
hookRef
249+
}
247250
}
248251

249-
private def checkState(): Unit = {
250-
if (shuttingDown) {
251-
throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
252-
}
252+
def remove(ref: AnyRef): Boolean = {
253+
hooks.synchronized { hooks.remove(ref) }
253254
}
254255

255256
}

core/src/main/scala/org/apache/spark/util/ThreadUtils.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,18 @@ private[spark] object ThreadUtils {
5757
* Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names
5858
* are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
5959
*/
60-
def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): ThreadPoolExecutor = {
60+
def newDaemonCachedThreadPool(
61+
prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = {
6162
val threadFactory = namedThreadFactory(prefix)
62-
new ThreadPoolExecutor(
63-
0, maxThreadNumber, 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], threadFactory)
63+
val threadPool = new ThreadPoolExecutor(
64+
maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks
65+
maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used
66+
keepAliveSeconds,
67+
TimeUnit.SECONDS,
68+
new LinkedBlockingQueue[Runnable],
69+
threadFactory)
70+
threadPool.allowCoreThreadTimeOut(true)
71+
threadPool
6472
}
6573

6674
/**

0 commit comments

Comments
 (0)