Skip to content

Commit 00f820e

Browse files
committed
Merge remote-tracking branch 'upstream/master' into dt-python-consistency
2 parents fe6dbfa + 63bdb1f commit 00f820e

File tree

32 files changed

+642
-360
lines changed

32 files changed

+642
-360
lines changed

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@
150150
<dependency>
151151
<groupId>org.json4s</groupId>
152152
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
153-
<version>3.2.6</version>
153+
<version>3.2.10</version>
154154
</dependency>
155155
<dependency>
156156
<groupId>colt</groupId>

core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,23 @@ class JdbcRDD[T: ClassTag](
9696

9797
override def close() {
9898
try {
99-
if (null != rs && ! rs.isClosed()) rs.close()
99+
if (null != rs && ! rs.isClosed()) {
100+
rs.close()
101+
}
100102
} catch {
101103
case e: Exception => logWarning("Exception closing resultset", e)
102104
}
103105
try {
104-
if (null != stmt && ! stmt.isClosed()) stmt.close()
106+
if (null != stmt && ! stmt.isClosed()) {
107+
stmt.close()
108+
}
105109
} catch {
106110
case e: Exception => logWarning("Exception closing statement", e)
107111
}
108112
try {
109-
if (null != conn && ! stmt.isClosed()) conn.close()
113+
if (null != conn && ! conn.isClosed()) {
114+
conn.close()
115+
}
110116
logInfo("closed connection")
111117
} catch {
112118
case e: Exception => logWarning("Exception closing connection", e)
@@ -120,3 +126,4 @@ object JdbcRDD {
120126
Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
121127
}
122128
}
129+

core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.annotation.DeveloperApi
2222
@DeveloperApi
2323
object TaskLocality extends Enumeration {
2424
// Process local is expected to be used ONLY within TaskSetManager for now.
25-
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
25+
val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value
2626

2727
type TaskLocality = Value
2828

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,11 @@ private[spark] class TaskSchedulerImpl(
8989

9090
// The set of executors we have on each host; this is used to compute hostsAlive, which
9191
// in turn is used to decide when we can attain data locality on a given host
92-
private val executorsByHost = new HashMap[String, HashSet[String]]
92+
protected val executorsByHost = new HashMap[String, HashSet[String]]
9393

9494
protected val hostsByRack = new HashMap[String, HashSet[String]]
9595

96-
private val executorIdToHost = new HashMap[String, String]
96+
protected val executorIdToHost = new HashMap[String, String]
9797

9898
// Listener object to pass upcalls into
9999
var dagScheduler: DAGScheduler = null
@@ -249,6 +249,7 @@ private[spark] class TaskSchedulerImpl(
249249

250250
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
251251
// of locality levels so that it gets a chance to launch local tasks on all of them.
252+
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
252253
var launchedTask = false
253254
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
254255
do {
@@ -265,7 +266,7 @@ private[spark] class TaskSchedulerImpl(
265266
activeExecutorIds += execId
266267
executorsByHost(host) += execId
267268
availableCpus(i) -= CPUS_PER_TASK
268-
assert (availableCpus(i) >= 0)
269+
assert(availableCpus(i) >= 0)
269270
launchedTask = true
270271
}
271272
}

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 60 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ private[spark] class TaskSetManager(
7979
private val numFailures = new Array[Int](numTasks)
8080
// key is taskId, value is a Map of executor id to when it failed
8181
private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
82+
8283
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
8384
var tasksSuccessful = 0
8485

@@ -179,26 +180,17 @@ private[spark] class TaskSetManager(
179180
}
180181
}
181182

182-
var hadAliveLocations = false
183183
for (loc <- tasks(index).preferredLocations) {
184184
for (execId <- loc.executorId) {
185185
addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
186186
}
187-
if (sched.hasExecutorsAliveOnHost(loc.host)) {
188-
hadAliveLocations = true
189-
}
190187
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
191188
for (rack <- sched.getRackForHost(loc.host)) {
192189
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
193-
if(sched.hasHostAliveOnRack(rack)){
194-
hadAliveLocations = true
195-
}
196190
}
197191
}
198192

199-
if (!hadAliveLocations) {
200-
// Even though the task might've had preferred locations, all of those hosts or executors
201-
// are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
193+
if (tasks(index).preferredLocations == Nil) {
202194
addTo(pendingTasksWithNoPrefs)
203195
}
204196

@@ -239,7 +231,6 @@ private[spark] class TaskSetManager(
239231
*/
240232
private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
241233
var indexOffset = list.size
242-
243234
while (indexOffset > 0) {
244235
indexOffset -= 1
245236
val index = list(indexOffset)
@@ -288,12 +279,12 @@ private[spark] class TaskSetManager(
288279
!hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)
289280

290281
if (!speculatableTasks.isEmpty) {
291-
// Check for process-local or preference-less tasks; note that tasks can be process-local
282+
// Check for process-local tasks; note that tasks can be process-local
292283
// on multiple nodes when we replicate cached blocks, as in Spark Streaming
293284
for (index <- speculatableTasks if canRunOnHost(index)) {
294285
val prefs = tasks(index).preferredLocations
295286
val executors = prefs.flatMap(_.executorId)
296-
if (prefs.size == 0 || executors.contains(execId)) {
287+
if (executors.contains(execId)) {
297288
speculatableTasks -= index
298289
return Some((index, TaskLocality.PROCESS_LOCAL))
299290
}
@@ -310,6 +301,17 @@ private[spark] class TaskSetManager(
310301
}
311302
}
312303

304+
// Check for no-preference tasks
305+
if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {
306+
for (index <- speculatableTasks if canRunOnHost(index)) {
307+
val locations = tasks(index).preferredLocations
308+
if (locations.size == 0) {
309+
speculatableTasks -= index
310+
return Some((index, TaskLocality.PROCESS_LOCAL))
311+
}
312+
}
313+
}
314+
313315
// Check for rack-local tasks
314316
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
315317
for (rack <- sched.getRackForHost(host)) {
@@ -341,20 +343,27 @@ private[spark] class TaskSetManager(
341343
*
342344
* @return An option containing (task index within the task set, locality, is speculative?)
343345
*/
344-
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
346+
private def findTask(execId: String, host: String, maxLocality: TaskLocality.Value)
345347
: Option[(Int, TaskLocality.Value, Boolean)] =
346348
{
347349
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
348350
return Some((index, TaskLocality.PROCESS_LOCAL, false))
349351
}
350352

351-
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
353+
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
352354
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
353355
return Some((index, TaskLocality.NODE_LOCAL, false))
354356
}
355357
}
356358

357-
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
359+
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
360+
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
361+
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
362+
return Some((index, TaskLocality.PROCESS_LOCAL, false))
363+
}
364+
}
365+
366+
if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
358367
for {
359368
rack <- sched.getRackForHost(host)
360369
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
@@ -363,25 +372,27 @@ private[spark] class TaskSetManager(
363372
}
364373
}
365374

366-
// Look for no-pref tasks after rack-local tasks since they can run anywhere.
367-
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
368-
return Some((index, TaskLocality.PROCESS_LOCAL, false))
369-
}
370-
371-
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
375+
if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
372376
for (index <- findTaskFromList(execId, allPendingTasks)) {
373377
return Some((index, TaskLocality.ANY, false))
374378
}
375379
}
376380

377-
// Finally, if all else has failed, find a speculative task
378-
findSpeculativeTask(execId, host, locality).map { case (taskIndex, allowedLocality) =>
379-
(taskIndex, allowedLocality, true)
380-
}
381+
// find a speculative task if all others tasks have been scheduled
382+
findSpeculativeTask(execId, host, maxLocality).map {
383+
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
381384
}
382385

383386
/**
384387
* Respond to an offer of a single executor from the scheduler by finding a task
388+
*
389+
* NOTE: this function is either called with a maxLocality which
390+
* would be adjusted by delay scheduling algorithm or it will be with a special
391+
* NO_PREF locality which will be not modified
392+
*
393+
* @param execId the executor Id of the offered resource
394+
* @param host the host Id of the offered resource
395+
* @param maxLocality the maximum locality we want to schedule the tasks at
385396
*/
386397
def resourceOffer(
387398
execId: String,
@@ -392,9 +403,14 @@ private[spark] class TaskSetManager(
392403
if (!isZombie) {
393404
val curTime = clock.getTime()
394405

395-
var allowedLocality = getAllowedLocalityLevel(curTime)
396-
if (allowedLocality > maxLocality) {
397-
allowedLocality = maxLocality // We're not allowed to search for farther-away tasks
406+
var allowedLocality = maxLocality
407+
408+
if (maxLocality != TaskLocality.NO_PREF) {
409+
allowedLocality = getAllowedLocalityLevel(curTime)
410+
if (allowedLocality > maxLocality) {
411+
// We're not allowed to search for farther-away tasks
412+
allowedLocality = maxLocality
413+
}
398414
}
399415

400416
findTask(execId, host, allowedLocality) match {
@@ -410,8 +426,11 @@ private[spark] class TaskSetManager(
410426
taskInfos(taskId) = info
411427
taskAttempts(index) = info :: taskAttempts(index)
412428
// Update our locality level for delay scheduling
413-
currentLocalityIndex = getLocalityIndex(taskLocality)
414-
lastLaunchTime = curTime
429+
// NO_PREF will not affect the variables related to delay scheduling
430+
if (maxLocality != TaskLocality.NO_PREF) {
431+
currentLocalityIndex = getLocalityIndex(taskLocality)
432+
lastLaunchTime = curTime
433+
}
415434
// Serialize and return the task
416435
val startTime = clock.getTime()
417436
// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
@@ -639,8 +658,7 @@ private[spark] class TaskSetManager(
639658
override def executorLost(execId: String, host: String) {
640659
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
641660

642-
// Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a
643-
// task that used to have locations on only this host might now go to the no-prefs list. Note
661+
// Re-enqueue pending tasks for this host based on the status of the cluster. Note
644662
// that it's okay if we add a task to the same queue twice (if it had multiple preferred
645663
// locations), because findTaskFromList will skip already-running tasks.
646664
for (index <- getPendingTasksForExecutor(execId)) {
@@ -671,6 +689,9 @@ private[spark] class TaskSetManager(
671689
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
672690
handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure)
673691
}
692+
// recalculate valid locality levels and waits when executor is lost
693+
myLocalityLevels = computeValidLocalityLevels()
694+
localityWaits = myLocalityLevels.map(getLocalityWait)
674695
}
675696

676697
/**
@@ -722,17 +743,17 @@ private[spark] class TaskSetManager(
722743
conf.get("spark.locality.wait.node", defaultWait).toLong
723744
case TaskLocality.RACK_LOCAL =>
724745
conf.get("spark.locality.wait.rack", defaultWait).toLong
725-
case TaskLocality.ANY =>
726-
0L
746+
case _ => 0L
727747
}
728748
}
729749

730750
/**
731751
* Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
732752
* added to queues using addPendingTask.
753+
*
733754
*/
734755
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
735-
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
756+
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
736757
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
737758
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
738759
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
@@ -742,6 +763,9 @@ private[spark] class TaskSetManager(
742763
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
743764
levels += NODE_LOCAL
744765
}
766+
if (!pendingTasksWithNoPrefs.isEmpty) {
767+
levels += NO_PREF
768+
}
745769
if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
746770
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
747771
levels += RACK_LOCAL
@@ -751,20 +775,7 @@ private[spark] class TaskSetManager(
751775
levels.toArray
752776
}
753777

754-
// Re-compute pendingTasksWithNoPrefs since new preferred locations may become available
755778
def executorAdded() {
756-
def newLocAvail(index: Int): Boolean = {
757-
for (loc <- tasks(index).preferredLocations) {
758-
if (sched.hasExecutorsAliveOnHost(loc.host) ||
759-
(sched.getRackForHost(loc.host).isDefined &&
760-
sched.hasHostAliveOnRack(sched.getRackForHost(loc.host).get))) {
761-
return true
762-
}
763-
}
764-
false
765-
}
766-
logInfo("Re-computing pending task lists.")
767-
pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_))
768779
myLocalityLevels = computeValidLocalityLevels()
769780
localityWaits = myLocalityLevels.map(getLocalityWait)
770781
}

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private[spark] class SortShuffleWriter[K, V, C](
4040
private val ser = Serializer.getSerializer(dep.serializer.orNull)
4141

4242
private val conf = SparkEnv.get.conf
43-
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
43+
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
4444

4545
private var sorter: ExternalSorter[K, V, _] = null
4646
private var outputFile: File = null

core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
7373
val sortBasedShuffle =
7474
conf.get("spark.shuffle.manager", "") == classOf[SortShuffleManager].getName
7575

76-
private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
76+
private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
7777

7878
/**
7979
* Contains all the state related to a particular shuffle. This includes a pool of unused

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class ExternalAppendOnlyMap[K, V, C](
101101
private var _memoryBytesSpilled = 0L
102102
private var _diskBytesSpilled = 0L
103103

104-
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
104+
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
105105
private val keyComparator = new HashComparator[K]
106106
private val ser = serializer.newInstance()
107107

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private[spark] class ExternalSorter[K, V, C](
8484

8585
private val conf = SparkEnv.get.conf
8686
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
87-
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
87+
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
8888

8989
// Size of object batches when reading/writing from serializers.
9090
//

0 commit comments

Comments
 (0)