Skip to content

Commit cc30430

Browse files
committed
Merge remote-tracking branch 'origin/master' into parquetMetastore
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
2 parents 4f3d54f + 9eb74c7 commit cc30430

File tree

119 files changed

+2791
-1404
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

119 files changed

+2791
-1404
lines changed

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ log4j-defaults.properties
2525
bootstrap-tooltip.js
2626
jquery-1.11.1.min.js
2727
sorttable.js
28+
.*avsc
2829
.*txt
2930
.*json
3031
.*data

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,15 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
6666

6767
/**
6868
* Whether the cleaning thread will block on cleanup tasks.
69-
* This is set to true only for tests.
69+
*
70+
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
71+
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
72+
* issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
73+
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
74+
* longer in scope.
7075
*/
7176
private val blockOnCleanupTasks = sc.conf.getBoolean(
72-
"spark.cleaner.referenceTracking.blocking", false)
77+
"spark.cleaner.referenceTracking.blocking", true)
7378

7479
@volatile private var stopped = false
7580

@@ -174,9 +179,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
174179
private def blockManagerMaster = sc.env.blockManager.master
175180
private def broadcastManager = sc.env.broadcastManager
176181
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
177-
178-
// Used for testing. These methods explicitly blocks until cleanup is completed
179-
// to ensure that more reliable testing.
180182
}
181183

182184
private object ContextCleaner {

core/src/main/scala/org/apache/spark/InterruptibleIterator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator
3333
// is allowed. The assumption is that Thread.interrupted does not have a memory fence in read
3434
// (just a volatile field in C), while context.interrupted is a volatile in the JVM, which
3535
// introduces an expensive read fence.
36-
if (context.interrupted) {
36+
if (context.isInterrupted) {
3737
throw new TaskKilledException
3838
} else {
3939
delegate.hasNext

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,22 @@ object SparkEnv extends Logging {
210210
"MapOutputTracker",
211211
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
212212

213+
// Let the user specify short names for shuffle managers
214+
val shortShuffleMgrNames = Map(
215+
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
216+
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
217+
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
218+
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
219+
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
220+
221+
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
222+
213223
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
214224
"BlockManagerMaster",
215225
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
216226

217227
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
218-
serializer, conf, securityManager, mapOutputTracker)
228+
serializer, conf, securityManager, mapOutputTracker, shuffleManager)
219229

220230
val connectionManager = blockManager.connectionManager
221231

@@ -250,16 +260,6 @@ object SparkEnv extends Logging {
250260
"."
251261
}
252262

253-
// Let the user specify short names for shuffle managers
254-
val shortShuffleMgrNames = Map(
255-
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
256-
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
257-
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
258-
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
259-
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
260-
261-
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
262-
263263
// Warn about deprecated spark.cache.class property
264264
if (conf.contains("spark.cache.class")) {
265265
logWarning("The spark.cache.class property is no longer being used! Specify storage " +

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,18 @@ import scala.collection.mutable.ArrayBuffer
2121

2222
import org.apache.spark.annotation.DeveloperApi
2323
import org.apache.spark.executor.TaskMetrics
24+
import org.apache.spark.util.TaskCompletionListener
25+
2426

2527
/**
2628
* :: DeveloperApi ::
2729
* Contextual information about a task which can be read or mutated during execution.
30+
*
31+
* @param stageId stage id
32+
* @param partitionId index of the partition
33+
* @param attemptId the number of attempts to execute this task
34+
* @param runningLocally whether the task is running locally in the driver JVM
35+
* @param taskMetrics performance metrics of the task
2836
*/
2937
@DeveloperApi
3038
class TaskContext(
@@ -39,27 +47,68 @@ class TaskContext(
3947
def splitId = partitionId
4048

4149
// List of callback functions to execute when the task completes.
42-
@transient private val onCompleteCallbacks = new ArrayBuffer[() => Unit]
50+
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]
4351

4452
// Whether the corresponding task has been killed.
45-
@volatile var interrupted: Boolean = false
53+
@volatile private var interrupted: Boolean = false
54+
55+
// Whether the task has completed.
56+
@volatile private var completed: Boolean = false
57+
58+
/** Checks whether the task has completed. */
59+
def isCompleted: Boolean = completed
4660

47-
// Whether the task has completed, before the onCompleteCallbacks are executed.
48-
@volatile var completed: Boolean = false
61+
/** Checks whether the task has been killed. */
62+
def isInterrupted: Boolean = interrupted
63+
64+
// TODO: Also track whether the task has completed successfully or with exception.
65+
66+
/**
67+
* Add a (Java friendly) listener to be executed on task completion.
68+
* This will be called in all situation - success, failure, or cancellation.
69+
*
70+
* An example use is for HadoopRDD to register a callback to close the input stream.
71+
*/
72+
def addTaskCompletionListener(listener: TaskCompletionListener): this.type = {
73+
onCompleteCallbacks += listener
74+
this
75+
}
76+
77+
/**
78+
* Add a listener in the form of a Scala closure to be executed on task completion.
79+
* This will be called in all situation - success, failure, or cancellation.
80+
*
81+
* An example use is for HadoopRDD to register a callback to close the input stream.
82+
*/
83+
def addTaskCompletionListener(f: TaskContext => Unit): this.type = {
84+
onCompleteCallbacks += new TaskCompletionListener {
85+
override def onTaskCompletion(context: TaskContext): Unit = f(context)
86+
}
87+
this
88+
}
4989

5090
/**
5191
* Add a callback function to be executed on task completion. An example use
5292
* is for HadoopRDD to register a callback to close the input stream.
5393
* Will be called in any situation - success, failure, or cancellation.
5494
* @param f Callback function.
5595
*/
96+
@deprecated("use addTaskCompletionListener", "1.1.0")
5697
def addOnCompleteCallback(f: () => Unit) {
57-
onCompleteCallbacks += f
98+
onCompleteCallbacks += new TaskCompletionListener {
99+
override def onTaskCompletion(context: TaskContext): Unit = f()
100+
}
58101
}
59102

60-
def executeOnCompleteCallbacks() {
103+
/** Marks the task as completed and triggers the listeners. */
104+
private[spark] def markTaskCompleted(): Unit = {
61105
completed = true
62106
// Process complete callbacks in the reverse order of registration
63-
onCompleteCallbacks.reverse.foreach { _() }
107+
onCompleteCallbacks.reverse.foreach { _.onTaskCompletion(this) }
108+
}
109+
110+
/** Marks the task for interruption, i.e. cancellation. */
111+
private[spark] def markInterrupted(): Unit = {
112+
interrupted = true
64113
}
65114
}

core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.api.python
1919

2020
import org.apache.spark.broadcast.Broadcast
2121
import org.apache.spark.rdd.RDD
22+
import org.apache.spark.util.Utils
2223
import org.apache.spark.{Logging, SerializableWritable, SparkException}
2324
import org.apache.hadoop.conf.Configuration
2425
import org.apache.hadoop.io._
@@ -42,7 +43,7 @@ private[python] object Converter extends Logging {
4243
defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
4344
converterClass.map { cc =>
4445
Try {
45-
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
46+
val c = Utils.classForName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
4647
logInfo(s"Loaded converter: $cc")
4748
c
4849
} match {

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ private[spark] class PythonRDD(
6868
// Start a thread to feed the process input from our parent's iterator
6969
val writerThread = new WriterThread(env, worker, split, context)
7070

71-
context.addOnCompleteCallback { () =>
71+
context.addTaskCompletionListener { context =>
7272
writerThread.shutdownOnTaskCompletion()
7373

7474
// Cleanup the worker socket. This will also cause the Python worker to exit.
@@ -137,7 +137,7 @@ private[spark] class PythonRDD(
137137
}
138138
} catch {
139139

140-
case e: Exception if context.interrupted =>
140+
case e: Exception if context.isInterrupted =>
141141
logDebug("Exception thrown after task interruption", e)
142142
throw new TaskKilledException
143143

@@ -176,7 +176,7 @@ private[spark] class PythonRDD(
176176

177177
/** Terminates the writer thread, ignoring any exceptions that may occur due to cleanup. */
178178
def shutdownOnTaskCompletion() {
179-
assert(context.completed)
179+
assert(context.isCompleted)
180180
this.interrupt()
181181
}
182182

@@ -209,7 +209,7 @@ private[spark] class PythonRDD(
209209
PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
210210
dataOut.flush()
211211
} catch {
212-
case e: Exception if context.completed || context.interrupted =>
212+
case e: Exception if context.isCompleted || context.isInterrupted =>
213213
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
214214

215215
case e: Exception =>
@@ -235,10 +235,10 @@ private[spark] class PythonRDD(
235235
override def run() {
236236
// Kill the worker if it is interrupted, checking until task completion.
237237
// TODO: This has a race condition if interruption occurs, as completed may still become true.
238-
while (!context.interrupted && !context.completed) {
238+
while (!context.isInterrupted && !context.isCompleted) {
239239
Thread.sleep(2000)
240240
}
241-
if (!context.completed) {
241+
if (!context.isCompleted) {
242242
try {
243243
logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
244244
env.destroyPythonWorker(pythonExec, envVars.toMap, worker)
@@ -315,6 +315,14 @@ private[spark] object PythonRDD extends Logging {
315315
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
316316
}
317317

318+
def readBroadcastFromFile(sc: JavaSparkContext, filename: String): Broadcast[Array[Byte]] = {
319+
val file = new DataInputStream(new FileInputStream(filename))
320+
val length = file.readInt()
321+
val obj = new Array[Byte](length)
322+
file.readFully(obj)
323+
sc.broadcast(obj)
324+
}
325+
318326
def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
319327
// The right way to implement this would be to use TypeTags to get the full
320328
// type of T. Since I don't want to introduce breaking changes throughout the
@@ -372,8 +380,8 @@ private[spark] object PythonRDD extends Logging {
372380
batchSize: Int) = {
373381
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
374382
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
375-
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
376-
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
383+
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
384+
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
377385
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
378386
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
379387
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
@@ -440,9 +448,9 @@ private[spark] object PythonRDD extends Logging {
440448
keyClass: String,
441449
valueClass: String,
442450
conf: Configuration) = {
443-
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
444-
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
445-
val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
451+
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
452+
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
453+
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
446454
if (path.isDefined) {
447455
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
448456
} else {
@@ -509,9 +517,9 @@ private[spark] object PythonRDD extends Logging {
509517
keyClass: String,
510518
valueClass: String,
511519
conf: Configuration) = {
512-
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
513-
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
514-
val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
520+
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
521+
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
522+
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
515523
if (path.isDefined) {
516524
sc.sc.hadoopFile(path.get, fc, kc, vc)
517525
} else {
@@ -558,7 +566,7 @@ private[spark] object PythonRDD extends Logging {
558566
for {
559567
k <- Option(keyClass)
560568
v <- Option(valueClass)
561-
} yield (Class.forName(k), Class.forName(v))
569+
} yield (Utils.classForName(k), Utils.classForName(v))
562570
}
563571

564572
private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
@@ -621,10 +629,10 @@ private[spark] object PythonRDD extends Logging {
621629
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
622630
inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
623631
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
624-
val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
632+
val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
625633
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
626634
new JavaToWritableConverter)
627-
val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
635+
val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
628636
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
629637
}
630638

@@ -653,7 +661,7 @@ private[spark] object PythonRDD extends Logging {
653661
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
654662
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
655663
new JavaToWritableConverter)
656-
val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
664+
val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
657665
converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
658666
}
659667

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ private[spark] class ApplicationInfo(
4646

4747
init()
4848

49+
private def readObject(in: java.io.ObjectInputStream): Unit = {
50+
in.defaultReadObject()
51+
init()
52+
}
53+
4954
private def init() {
5055
state = ApplicationState.WAITING
5156
executors = new mutable.HashMap[Int, ExecutorInfo]

core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
2222
import org.apache.spark.metrics.source.Source
2323

2424
class ApplicationSource(val application: ApplicationInfo) extends Source {
25-
val metricRegistry = new MetricRegistry()
26-
val sourceName = "%s.%s.%s".format("application", application.desc.name,
25+
override val metricRegistry = new MetricRegistry()
26+
override val sourceName = "%s.%s.%s".format("application", application.desc.name,
2727
System.currentTimeMillis())
2828

2929
metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {

core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
2222
import org.apache.spark.metrics.source.Source
2323

2424
private[spark] class MasterSource(val master: Master) extends Source {
25-
val metricRegistry = new MetricRegistry()
26-
val sourceName = "master"
25+
override val metricRegistry = new MetricRegistry()
26+
override val sourceName = "master"
2727

2828
// Gauge for worker numbers in cluster
2929
metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] {

0 commit comments

Comments
 (0)