Skip to content

Commit af55030

Browse files
committed
Merge in master
2 parents ca27da0 + 74cd46e commit af55030

File tree

263 files changed

+7331
-4688
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

263 files changed

+7331
-4688
lines changed

R/pkg/DESCRIPTION

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,4 @@ Collate:
6262
RoxygenNote: 5.0.1
6363
VignetteBuilder: knitr
6464
NeedsCompilation: no
65+
Encoding: UTF-8

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -496,12 +496,7 @@ test_that("SPARK-17811: can create DataFrame containing NA as date and time", {
496496
expect_true(is.na(DF$date[2]))
497497
expect_equal(DF$date[1], as.Date("2016-10-01"))
498498
expect_true(is.na(DF$time[2]))
499-
# Warnings were suppressed due to Jenkins environment issues.
500-
# It shows both warnings as below in Jenkins:
501-
# - Your system is mis-configured: /etc/localtime is not a symlink
502-
# - It is strongly recommended to set environment variable TZ to
503-
# America/Los_Angeles (or equivalent)
504-
suppressWarnings(expect_equal(DF$time[1], as.POSIXlt("2016-01-10")))
499+
expect_equal(DF$time[1], as.POSIXlt("2016-01-10"))
505500
})
506501

507502
test_that("create DataFrame with complex types", {

R/run-tests.sh

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,9 @@ NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)"
3131
# Also run the documentation tests for CRAN
3232
CRAN_CHECK_LOG_FILE=$FWDIR/cran-check.out
3333
rm -f $CRAN_CHECK_LOG_FILE
34-
# TODO(SPARK-30737) reenable this once packages are correctly installed in Jenkins
35-
# NO_TESTS=1 NO_MANUAL=1 $FWDIR/check-cran.sh 2>&1 | tee -a $CRAN_CHECK_LOG_FILE
36-
# FAILED=$((PIPESTATUS[0]||$FAILED))
37-
touch $CRAN_CHECK_LOG_FILE
34+
35+
NO_TESTS=1 NO_MANUAL=1 $FWDIR/check-cran.sh 2>&1 | tee -a $CRAN_CHECK_LOG_FILE
36+
FAILED=$((PIPESTATUS[0]||$FAILED))
3837

3938
NUM_CRAN_WARNING="$(grep -c WARNING$ $CRAN_CHECK_LOG_FILE)"
4039
NUM_CRAN_ERROR="$(grep -c ERROR$ $CRAN_CHECK_LOG_FILE)"

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,24 +37,29 @@ private[spark] trait ExecutorAllocationClient {
3737
/**
3838
* Update the cluster manager on our scheduling needs. Three bits of information are included
3939
* to help it make decisions.
40-
* @param numExecutors The total number of executors we'd like to have. The cluster manager
41-
* shouldn't kill any running executor to reach this number, but,
42-
* if all existing executors were to die, this is the number of executors
43-
* we'd want to be allocated.
44-
* @param localityAwareTasks The number of tasks in all active stages that have a locality
45-
* preferences. This includes running, pending, and completed tasks.
46-
* @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
47-
* that would like to like to run on that host.
48-
* This includes running, pending, and completed tasks.
40+
*
41+
* @param resourceProfileIdToNumExecutors The total number of executors we'd like to have per
42+
* ResourceProfile id. The cluster manager shouldn't kill
43+
* any running executor to reach this number, but, if all
44+
* existing executors were to die, this is the number
45+
* of executors we'd want to be allocated.
46+
* @param numLocalityAwareTasksPerResourceProfileId The number of tasks in all active stages that
47+
* have a locality preferences per
48+
* ResourceProfile id. This includes running,
49+
* pending, and completed tasks.
50+
* @param hostToLocalTaskCount A map of ResourceProfile id to a map of hosts to the number of
51+
* tasks from all active stages that would like to like to run on
52+
* that host. This includes running, pending, and completed tasks.
4953
* @return whether the request is acknowledged by the cluster manager.
5054
*/
5155
private[spark] def requestTotalExecutors(
52-
numExecutors: Int,
53-
localityAwareTasks: Int,
54-
hostToLocalTaskCount: Map[String, Int]): Boolean
56+
resourceProfileIdToNumExecutors: Map[Int, Int],
57+
numLocalityAwareTasksPerResourceProfileId: Map[Int, Int],
58+
hostToLocalTaskCount: Map[Int, Map[String, Int]]): Boolean
5559

5660
/**
57-
* Request an additional number of executors from the cluster manager.
61+
* Request an additional number of executors from the cluster manager for the default
62+
* ResourceProfile.
5863
* @return whether the request is acknowledged by the cluster manager.
5964
*/
6065
def requestExecutors(numAdditionalExecutors: Int): Boolean

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 324 additions & 149 deletions
Large diffs are not rendered by default.

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,8 @@ private[spark] object SparkConf extends Logging {
684684
"spark.yarn.jars" -> Seq(
685685
AlternateConfig("spark.yarn.jar", "2.0")),
686686
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
687-
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
687+
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3"),
688+
AlternateConfig("spark.maxRemoteBlockSizeFetchToMem", "3.0")),
688689
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
689690
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3")),
690691
DRIVER_MEMORY_OVERHEAD.key -> Seq(

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 42 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReferenc
2525

2626
import scala.collection.JavaConverters._
2727
import scala.collection.Map
28+
import scala.collection.immutable
2829
import scala.collection.mutable.HashMap
2930
import scala.language.implicitConversions
3031
import scala.reflect.{classTag, ClassTag}
@@ -53,7 +54,7 @@ import org.apache.spark.io.CompressionCodec
5354
import org.apache.spark.metrics.source.JVMCPUSource
5455
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
5556
import org.apache.spark.rdd._
56-
import org.apache.spark.resource.{ResourceID, ResourceInformation}
57+
import org.apache.spark.resource._
5758
import org.apache.spark.resource.ResourceUtils._
5859
import org.apache.spark.rpc.RpcEndpointRef
5960
import org.apache.spark.scheduler._
@@ -219,9 +220,10 @@ class SparkContext(config: SparkConf) extends Logging {
219220
private var _shutdownHookRef: AnyRef = _
220221
private var _statusStore: AppStatusStore = _
221222
private var _heartbeater: Heartbeater = _
222-
private var _resources: scala.collection.immutable.Map[String, ResourceInformation] = _
223+
private var _resources: immutable.Map[String, ResourceInformation] = _
223224
private var _shuffleDriverComponents: ShuffleDriverComponents = _
224225
private var _plugins: Option[PluginContainer] = None
226+
private var _resourceProfileManager: ResourceProfileManager = _
225227

226228
/* ------------------------------------------------------------------------------------- *
227229
| Accessors and public fields. These provide access to the internal state of the |
@@ -343,6 +345,8 @@ class SparkContext(config: SparkConf) extends Logging {
343345
private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] =
344346
_executorAllocationManager
345347

348+
private[spark] def resourceProfileManager: ResourceProfileManager = _resourceProfileManager
349+
346350
private[spark] def cleaner: Option[ContextCleaner] = _cleaner
347351

348352
private[spark] var checkpointDir: Option[String] = None
@@ -451,6 +455,7 @@ class SparkContext(config: SparkConf) extends Logging {
451455
}
452456

453457
_listenerBus = new LiveListenerBus(_conf)
458+
_resourceProfileManager = new ResourceProfileManager(_conf)
454459

455460
// Initialize the app status store and listener before SparkEnv is created so that it gets
456461
// all events.
@@ -611,7 +616,7 @@ class SparkContext(config: SparkConf) extends Logging {
611616
case b: ExecutorAllocationClient =>
612617
Some(new ExecutorAllocationManager(
613618
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
614-
cleaner = cleaner))
619+
cleaner = cleaner, resourceProfileManager = resourceProfileManager))
615620
case _ =>
616621
None
617622
}
@@ -1622,7 +1627,7 @@ class SparkContext(config: SparkConf) extends Logging {
16221627

16231628
/**
16241629
* Update the cluster manager on our scheduling needs. Three bits of information are included
1625-
* to help it make decisions.
1630+
* to help it make decisions. This applies to the default ResourceProfile.
16261631
* @param numExecutors The total number of executors we'd like to have. The cluster manager
16271632
* shouldn't kill any running executor to reach this number, but,
16281633
* if all existing executors were to die, this is the number of executors
@@ -1638,11 +1643,16 @@ class SparkContext(config: SparkConf) extends Logging {
16381643
def requestTotalExecutors(
16391644
numExecutors: Int,
16401645
localityAwareTasks: Int,
1641-
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
1646+
hostToLocalTaskCount: immutable.Map[String, Int]
16421647
): Boolean = {
16431648
schedulerBackend match {
16441649
case b: ExecutorAllocationClient =>
1645-
b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
1650+
// this is being applied to the default resource profile, would need to add api to support
1651+
// others
1652+
val defaultProfId = resourceProfileManager.defaultResourceProfile.id
1653+
b.requestTotalExecutors(immutable.Map(defaultProfId-> numExecutors),
1654+
immutable.Map(localityAwareTasks -> defaultProfId),
1655+
immutable.Map(defaultProfId -> hostToLocalTaskCount))
16461656
case _ =>
16471657
logWarning("Requesting executors is not supported by current scheduler.")
16481658
false
@@ -2036,6 +2046,7 @@ class SparkContext(config: SparkConf) extends Logging {
20362046
// Clear this `InheritableThreadLocal`, or it will still be inherited in child threads even this
20372047
// `SparkContext` is stopped.
20382048
localProperties.remove()
2049+
ResourceProfile.clearDefaultProfile()
20392050
// Unset YARN mode system env variable, to allow switching between cluster types.
20402051
SparkContext.clearActiveContext()
20412052
logInfo("Successfully stopped SparkContext")
@@ -2771,109 +2782,34 @@ object SparkContext extends Logging {
27712782
// When running locally, don't try to re-execute tasks on failure.
27722783
val MAX_LOCAL_TASK_FAILURES = 1
27732784

2774-
// Ensure that executor's resources satisfies one or more tasks requirement.
2775-
def checkResourcesPerTask(clusterMode: Boolean, executorCores: Option[Int]): Unit = {
2785+
// Ensure that default executor's resources satisfies one or more tasks requirement.
2786+
// This function is for cluster managers that don't set the executor cores config, for
2787+
// others its checked in ResourceProfile.
2788+
def checkResourcesPerTask(executorCores: Int): Unit = {
27762789
val taskCores = sc.conf.get(CPUS_PER_TASK)
2777-
val execCores = if (clusterMode) {
2778-
executorCores.getOrElse(sc.conf.get(EXECUTOR_CORES))
2779-
} else {
2780-
executorCores.get
2781-
}
2782-
// some cluster managers don't set the EXECUTOR_CORES config by default (standalone
2783-
// and mesos coarse grained), so we can't rely on that config for those.
2784-
val shouldCheckExecCores = executorCores.isDefined || sc.conf.contains(EXECUTOR_CORES) ||
2785-
(master.equalsIgnoreCase("yarn") || master.startsWith("k8s"))
2786-
2787-
// Number of cores per executor must meet at least one task requirement.
2788-
if (shouldCheckExecCores && execCores < taskCores) {
2789-
throw new SparkException(s"The number of cores per executor (=$execCores) has to be >= " +
2790-
s"the task config: ${CPUS_PER_TASK.key} = $taskCores when run on $master.")
2791-
}
2792-
2793-
// Calculate the max slots each executor can provide based on resources available on each
2794-
// executor and resources required by each task.
2795-
val taskResourceRequirements = parseResourceRequirements(sc.conf, SPARK_TASK_PREFIX)
2796-
val executorResourcesAndAmounts = parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX)
2797-
.map(request => (request.id.resourceName, request.amount)).toMap
2798-
2799-
var (numSlots, limitingResourceName) = if (shouldCheckExecCores) {
2800-
(execCores / taskCores, "CPU")
2801-
} else {
2802-
(-1, "")
2803-
}
2804-
2805-
taskResourceRequirements.foreach { taskReq =>
2806-
// Make sure the executor resources were specified through config.
2807-
val execAmount = executorResourcesAndAmounts.getOrElse(taskReq.resourceName,
2808-
throw new SparkException("The executor resource config: " +
2809-
new ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf +
2810-
" needs to be specified since a task requirement config: " +
2811-
new ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
2812-
" was specified")
2813-
)
2814-
// Make sure the executor resources are large enough to launch at least one task.
2815-
if (execAmount < taskReq.amount) {
2816-
throw new SparkException("The executor resource config: " +
2817-
new ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf +
2818-
s" = $execAmount has to be >= the requested amount in task resource config: " +
2819-
new ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
2820-
s" = ${taskReq.amount}")
2821-
}
2822-
// Compare and update the max slots each executor can provide.
2823-
// If the configured amount per task was < 1.0, a task is subdividing
2824-
// executor resources. If the amount per task was > 1.0, the task wants
2825-
// multiple executor resources.
2826-
val resourceNumSlots = Math.floor(execAmount * taskReq.numParts / taskReq.amount).toInt
2827-
if (resourceNumSlots < numSlots) {
2828-
if (shouldCheckExecCores) {
2829-
throw new IllegalArgumentException("The number of slots on an executor has to be " +
2830-
"limited by the number of cores, otherwise you waste resources and " +
2831-
"dynamic allocation doesn't work properly. Your configuration has " +
2832-
s"core/task cpu slots = ${numSlots} and " +
2833-
s"${taskReq.resourceName} = ${resourceNumSlots}. " +
2834-
"Please adjust your configuration so that all resources require same number " +
2835-
"of executor slots.")
2836-
}
2837-
numSlots = resourceNumSlots
2838-
limitingResourceName = taskReq.resourceName
2839-
}
2840-
}
2841-
if(!shouldCheckExecCores && Utils.isDynamicAllocationEnabled(sc.conf)) {
2842-
// if we can't rely on the executor cores config throw a warning for user
2843-
logWarning("Please ensure that the number of slots available on your " +
2844-
"executors is limited by the number of cores to task cpus and not another " +
2845-
"custom resource. If cores is not the limiting resource then dynamic " +
2846-
"allocation will not work properly!")
2847-
}
2848-
// warn if we would waste any resources due to another resource limiting the number of
2849-
// slots on an executor
2850-
taskResourceRequirements.foreach { taskReq =>
2851-
val execAmount = executorResourcesAndAmounts(taskReq.resourceName)
2852-
if ((numSlots * taskReq.amount / taskReq.numParts) < execAmount) {
2853-
val taskReqStr = if (taskReq.numParts > 1) {
2854-
s"${taskReq.amount}/${taskReq.numParts}"
2855-
} else {
2856-
s"${taskReq.amount}"
2857-
}
2858-
val resourceNumSlots = Math.floor(execAmount * taskReq.numParts / taskReq.amount).toInt
2859-
val message = s"The configuration of resource: ${taskReq.resourceName} " +
2860-
s"(exec = ${execAmount}, task = ${taskReqStr}, " +
2861-
s"runnable tasks = ${resourceNumSlots}) will " +
2862-
s"result in wasted resources due to resource ${limitingResourceName} limiting the " +
2863-
s"number of runnable tasks per executor to: ${numSlots}. Please adjust " +
2864-
s"your configuration."
2865-
if (Utils.isTesting) {
2866-
throw new SparkException(message)
2867-
} else {
2868-
logWarning(message)
2869-
}
2870-
}
2790+
validateTaskCpusLargeEnough(executorCores, taskCores)
2791+
val defaultProf = sc.resourceProfileManager.defaultResourceProfile
2792+
// TODO - this is temporary until all of stage level scheduling feature is integrated,
2793+
// fail if any other resource limiting due to dynamic allocation and scheduler using
2794+
// slots based on cores
2795+
val cpuSlots = executorCores/taskCores
2796+
val limitingResource = defaultProf.limitingResource(sc.conf)
2797+
if (limitingResource.nonEmpty && !limitingResource.equals(ResourceProfile.CPUS) &&
2798+
defaultProf.maxTasksPerExecutor(sc.conf) < cpuSlots) {
2799+
throw new IllegalArgumentException("The number of slots on an executor has to be " +
2800+
"limited by the number of cores, otherwise you waste resources and " +
2801+
"dynamic allocation doesn't work properly. Your configuration has " +
2802+
s"core/task cpu slots = ${cpuSlots} and " +
2803+
s"${limitingResource} = " +
2804+
s"${defaultProf.maxTasksPerExecutor(sc.conf)}. Please adjust your configuration " +
2805+
"so that all resources require same number of executor slots.")
28712806
}
2807+
ResourceUtils.warnOnWastedResources(defaultProf, sc.conf, Some(executorCores))
28722808
}
28732809

28742810
master match {
28752811
case "local" =>
2876-
checkResourcesPerTask(clusterMode = false, Some(1))
2812+
checkResourcesPerTask(1)
28772813
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
28782814
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
28792815
scheduler.initialize(backend)
@@ -2886,7 +2822,7 @@ object SparkContext extends Logging {
28862822
if (threadCount <= 0) {
28872823
throw new SparkException(s"Asked to run locally with $threadCount threads")
28882824
}
2889-
checkResourcesPerTask(clusterMode = false, Some(threadCount))
2825+
checkResourcesPerTask(threadCount)
28902826
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
28912827
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
28922828
scheduler.initialize(backend)
@@ -2897,22 +2833,21 @@ object SparkContext extends Logging {
28972833
// local[*, M] means the number of cores on the computer with M failures
28982834
// local[N, M] means exactly N threads with M failures
28992835
val threadCount = if (threads == "*") localCpuCount else threads.toInt
2900-
checkResourcesPerTask(clusterMode = false, Some(threadCount))
2836+
checkResourcesPerTask(threadCount)
29012837
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
29022838
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
29032839
scheduler.initialize(backend)
29042840
(backend, scheduler)
29052841

29062842
case SPARK_REGEX(sparkUrl) =>
2907-
checkResourcesPerTask(clusterMode = true, None)
29082843
val scheduler = new TaskSchedulerImpl(sc)
29092844
val masterUrls = sparkUrl.split(",").map("spark://" + _)
29102845
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
29112846
scheduler.initialize(backend)
29122847
(backend, scheduler)
29132848

29142849
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
2915-
checkResourcesPerTask(clusterMode = true, Some(coresPerSlave.toInt))
2850+
checkResourcesPerTask(coresPerSlave.toInt)
29162851
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
29172852
val memoryPerSlaveInt = memoryPerSlave.toInt
29182853
if (sc.executorMemory > memoryPerSlaveInt) {
@@ -2941,7 +2876,6 @@ object SparkContext extends Logging {
29412876
(backend, scheduler)
29422877

29432878
case masterUrl =>
2944-
checkResourcesPerTask(clusterMode = true, None)
29452879
val cm = getClusterManager(masterUrl) match {
29462880
case Some(clusterMgr) => clusterMgr
29472881
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")

core/src/main/scala/org/apache/spark/internal/Logging.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ trait Logging {
117117
}
118118

119119
// For testing
120-
def initializeForcefully(isInterpreter: Boolean, silent: Boolean): Unit = {
120+
private[spark] def initializeForcefully(isInterpreter: Boolean, silent: Boolean): Unit = {
121121
initializeLogging(isInterpreter, silent)
122122
}
123123

core/src/main/scala/org/apache/spark/internal/config/Tests.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,13 @@ private[spark] object Tests {
5353
val TEST_N_CORES_EXECUTOR = ConfigBuilder("spark.testing.nCoresPerExecutor")
5454
.intConf
5555
.createWithDefault(2)
56+
57+
val RESOURCES_WARNING_TESTING =
58+
ConfigBuilder("spark.resources.warnings.testing").booleanConf.createWithDefault(false)
59+
60+
val RESOURCE_PROFILE_MANAGER_TESTING =
61+
ConfigBuilder("spark.testing.resourceProfileManager")
62+
.booleanConf
63+
.createWithDefault(false)
64+
5665
}

0 commit comments

Comments
 (0)