Skip to content

Commit 162fbf0

Browse files
committed
Merge branch 'master' into SPARK-30018
2 parents e5d695e + dcdc9a8 commit 162fbf0

File tree

223 files changed

+4608
-3350
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

223 files changed

+4608
-3350
lines changed

.github/workflows/stale.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@ jobs:
1111
- uses: actions/stale@v1.1.0
1212
with:
1313
repo-token: ${{ secrets.GITHUB_TOKEN }}
14-
stale-pr-message: |
14+
stale-pr-message: >
1515
We're closing this PR because it hasn't been updated in a while.
1616
This isn't a judgement on the merit of the PR in any way. It's just
1717
a way of keeping the PR queue manageable.
1818
19-
If you'd like to revive this PR, please reopen it!
19+
If you'd like to revive this PR, please reopen it and ask a
20+
committer to remove the Stale tag!
2021
days-before-stale: 100
2122
# Setting this to 0 is the same as setting it to 1.
2223
# See: https://github.com/actions/stale/issues/28

R/pkg/R/functions.R

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -879,8 +879,8 @@ setMethod("factorial",
879879
#'
880880
#' The function by default returns the first values it sees. It will return the first non-missing
881881
#' value it sees when na.rm is set to true. If all values are missing, then NA is returned.
882-
#' Note: the function is non-deterministic because its results depends on order of rows which
883-
#' may be non-deterministic after a shuffle.
882+
#' Note: the function is non-deterministic because its results depends on the order of the rows
883+
#' which may be non-deterministic after a shuffle.
884884
#'
885885
#' @param na.rm a logical value indicating whether NA values should be stripped
886886
#' before the computation proceeds.
@@ -1024,8 +1024,8 @@ setMethod("kurtosis",
10241024
#'
10251025
#' The function by default returns the last values it sees. It will return the last non-missing
10261026
#' value it sees when na.rm is set to true. If all values are missing, then NA is returned.
1027-
#' Note: the function is non-deterministic because its results depends on order of rows which
1028-
#' may be non-deterministic after a shuffle.
1027+
#' Note: the function is non-deterministic because its results depends on the order of the rows
1028+
#' which may be non-deterministic after a shuffle.
10291029
#'
10301030
#' @param x column to compute on.
10311031
#' @param na.rm a logical value indicating whether NA values should be stripped
@@ -3706,7 +3706,7 @@ setMethod("create_map",
37063706
#' @details
37073707
#' \code{collect_list}: Creates a list of objects with duplicates.
37083708
#' Note: the function is non-deterministic because the order of collected results depends
3709-
#' on order of rows which may be non-deterministic after a shuffle.
3709+
#' on the order of the rows which may be non-deterministic after a shuffle.
37103710
#'
37113711
#' @rdname column_aggregate_functions
37123712
#' @aliases collect_list collect_list,Column-method
@@ -3727,7 +3727,7 @@ setMethod("collect_list",
37273727
#' @details
37283728
#' \code{collect_set}: Creates a list of objects with duplicate elements eliminated.
37293729
#' Note: the function is non-deterministic because the order of collected results depends
3730-
#' on order of rows which may be non-deterministic after a shuffle.
3730+
#' on the order of the rows which may be non-deterministic after a shuffle.
37313731
#'
37323732
#' @rdname column_aggregate_functions
37333733
#' @aliases collect_set collect_set,Column-method

common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private static class StreamState {
5757
int curChunk = 0;
5858

5959
// Used to keep track of the number of chunks being transferred and not finished yet.
60-
volatile long chunksBeingTransferred = 0L;
60+
final AtomicLong chunksBeingTransferred = new AtomicLong(0L);
6161

6262
StreamState(String appId, Iterator<ManagedBuffer> buffers, Channel channel) {
6363
this.appId = appId;
@@ -153,7 +153,7 @@ public void checkAuthorization(TransportClient client, long streamId) {
153153
public void chunkBeingSent(long streamId) {
154154
StreamState streamState = streams.get(streamId);
155155
if (streamState != null) {
156-
streamState.chunksBeingTransferred++;
156+
streamState.chunksBeingTransferred.incrementAndGet();
157157
}
158158

159159
}
@@ -167,7 +167,7 @@ public void streamBeingSent(String streamId) {
167167
public void chunkSent(long streamId) {
168168
StreamState streamState = streams.get(streamId);
169169
if (streamState != null) {
170-
streamState.chunksBeingTransferred--;
170+
streamState.chunksBeingTransferred.decrementAndGet();
171171
}
172172
}
173173

@@ -180,7 +180,7 @@ public void streamSent(String streamId) {
180180
public long chunksBeingTransferred() {
181181
long sum = 0L;
182182
for (StreamState streamState: streams.values()) {
183-
sum += streamState.chunksBeingTransferred;
183+
sum += streamState.chunksBeingTransferred.get();
184184
}
185185
return sum;
186186
}

core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,10 @@ private boolean refill() throws IOException {
5959
while (nRead == 0) {
6060
nRead = fileChannel.read(byteBuffer);
6161
}
62+
byteBuffer.flip();
6263
if (nRead < 0) {
6364
return false;
6465
}
65-
byteBuffer.flip();
6666
}
6767
return true;
6868
}

core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ private[spark] class PythonRunner(funcs: Seq[ChainedPythonFunctions])
611611
val obj = new Array[Byte](length)
612612
stream.readFully(obj)
613613
obj
614-
case 0 => Array.empty[Byte]
614+
case 0 => Array.emptyByteArray
615615
case SpecialLengths.TIMING_DATA =>
616616
handleTimingData()
617617
read()

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1507,7 +1507,8 @@ package object config {
15071507
"longer time than the threshold. This config helps speculate stage with very few " +
15081508
"tasks. Regular speculation configs may also apply if the executor slots are " +
15091509
"large enough. E.g. tasks might be re-launched if there are enough successful runs " +
1510-
"even though the threshold hasn't been reached.")
1510+
"even though the threshold hasn't been reached. The number of slots is computed based " +
1511+
"on the conf values of spark.executor.cores and spark.task.cpus minimum 1.")
15111512
.timeConf(TimeUnit.MILLISECONDS)
15121513
.createOptional
15131514

core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,18 +119,22 @@ private[spark] object ResourceUtils extends Logging {
119119
def parseAllResourceRequests(
120120
sparkConf: SparkConf,
121121
componentName: String): Seq[ResourceRequest] = {
122-
listResourceIds(sparkConf, componentName).map { id =>
123-
parseResourceRequest(sparkConf, id)
124-
}
122+
listResourceIds(sparkConf, componentName)
123+
.map(id => parseResourceRequest(sparkConf, id))
124+
.filter(_.amount > 0)
125125
}
126126

127127
def parseResourceRequirements(sparkConf: SparkConf, componentName: String)
128128
: Seq[ResourceRequirement] = {
129-
listResourceIds(sparkConf, componentName).map { resourceId =>
129+
val resourceIds = listResourceIds(sparkConf, componentName)
130+
val rnamesAndAmounts = resourceIds.map { resourceId =>
130131
val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap
131132
val amountDouble = settings.getOrElse(AMOUNT,
132133
throw new SparkException(s"You must specify an amount for ${resourceId.resourceName}")
133134
).toDouble
135+
(resourceId.resourceName, amountDouble)
136+
}
137+
rnamesAndAmounts.filter { case (_, amount) => amount > 0 }.map { case (rName, amountDouble) =>
134138
val (amount, parts) = if (componentName.equalsIgnoreCase(SPARK_TASK_PREFIX)) {
135139
val parts = if (amountDouble <= 0.5) {
136140
Math.floor(1.0 / amountDouble).toInt
@@ -147,7 +151,7 @@ private[spark] object ResourceUtils extends Logging {
147151
} else {
148152
(amountDouble.toInt, 1)
149153
}
150-
ResourceRequirement(resourceId.resourceName, amount, parts)
154+
ResourceRequirement(rName, amount, parts)
151155
}
152156
}
153157

@@ -184,10 +188,15 @@ private[spark] object ResourceUtils extends Logging {
184188
val allocated = resourcesFileOpt.toSeq.flatMap(parseAllocatedFromJsonFile)
185189
.filter(_.id.componentName == componentName)
186190
val otherResourceIds = listResourceIds(sparkConf, componentName).diff(allocated.map(_.id))
187-
allocated ++ otherResourceIds.map { id =>
191+
val otherResources = otherResourceIds.flatMap { id =>
188192
val request = parseResourceRequest(sparkConf, id)
189-
ResourceAllocation(id, discoverResource(request).addresses)
193+
if (request.amount > 0) {
194+
Some(ResourceAllocation(id, discoverResource(request).addresses))
195+
} else {
196+
None
197+
}
190198
}
199+
allocated ++ otherResources
191200
}
192201

193202
private def assertResourceAllocationMeetsRequest(

core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import javax.annotation.concurrent.GuardedBy
2222

2323
import scala.collection.JavaConverters._
2424
import scala.concurrent.Promise
25+
import scala.util.control.NonFatal
2526

2627
import org.apache.spark.SparkException
2728
import org.apache.spark.internal.Logging
@@ -44,16 +45,6 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte
4445
private val shutdownLatch = new CountDownLatch(1)
4546
private lazy val sharedLoop = new SharedMessageLoop(nettyEnv.conf, this, numUsableCores)
4647

47-
private def getMessageLoop(name: String, endpoint: RpcEndpoint): MessageLoop = {
48-
endpoint match {
49-
case e: IsolatedRpcEndpoint =>
50-
new DedicatedMessageLoop(name, e, this)
51-
case _ =>
52-
sharedLoop.register(name, endpoint)
53-
sharedLoop
54-
}
55-
}
56-
5748
/**
5849
* True if the dispatcher has been stopped. Once stopped, all messages posted will be bounced
5950
* immediately.
@@ -68,11 +59,31 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte
6859
if (stopped) {
6960
throw new IllegalStateException("RpcEnv has been stopped")
7061
}
71-
if (endpoints.putIfAbsent(name, getMessageLoop(name, endpoint)) != null) {
62+
if (endpoints.containsKey(name)) {
7263
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
7364
}
65+
66+
// This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
67+
// active when registering, and endpointRef must be put into endpointRefs before onStart is
68+
// called.
69+
endpointRefs.put(endpoint, endpointRef)
70+
71+
var messageLoop: MessageLoop = null
72+
try {
73+
messageLoop = endpoint match {
74+
case e: IsolatedRpcEndpoint =>
75+
new DedicatedMessageLoop(name, e, this)
76+
case _ =>
77+
sharedLoop.register(name, endpoint)
78+
sharedLoop
79+
}
80+
endpoints.put(name, messageLoop)
81+
} catch {
82+
case NonFatal(e) =>
83+
endpointRefs.remove(endpoint)
84+
throw e
85+
}
7486
}
75-
endpointRefs.put(endpoint, endpointRef)
7687
endpointRef
7788
}
7889

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,13 @@ private[spark] class TaskSetManager(
8787
// number of slots on a single executor, would the task manager speculative run the tasks if
8888
// their duration is longer than the given threshold. In this way, we wouldn't speculate too
8989
// aggressively but still handle basic cases.
90-
val speculationTasksLessEqToSlots = numTasks <= (conf.get(EXECUTOR_CORES) / sched.CPUS_PER_TASK)
90+
// SPARK-30417: #cores per executor might not be set in spark conf for standalone mode, then
91+
// the value of the conf would 1 by default. However, the executor would use all the cores on
92+
// the worker. Therefore, CPUS_PER_TASK is okay to be greater than 1 without setting #cores.
93+
// To handle this case, we assume the minimum number of slots is 1.
94+
// TODO: use the actual number of slots for standalone mode.
95+
val speculationTasksLessEqToSlots =
96+
numTasks <= Math.max(conf.get(EXECUTOR_CORES) / sched.CPUS_PER_TASK, 1)
9197

9298
// For each task, tracks whether a copy of the task has succeeded. A task will also be
9399
// marked as "succeeded" if it failed with a fetch failure, in which case it should not

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
9595

9696
// Executors we have requested the cluster manager to kill that have not died yet; maps
9797
// the executor ID to whether it was explicitly killed by the driver (and thus shouldn't
98-
// be considered an app-related failure).
98+
// be considered an app-related failure). Visible for testing only.
9999
@GuardedBy("CoarseGrainedSchedulerBackend.this")
100-
private val executorsPendingToRemove = new HashMap[String, Boolean]
100+
private[scheduler] val executorsPendingToRemove = new HashMap[String, Boolean]
101101

102102
// Executors that have been lost, but for which we don't yet know the real exit reason.
103103
private val executorsPendingLossReason = new HashSet[String]
@@ -487,12 +487,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
487487
/**
488488
* Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only
489489
* be called in the yarn-client mode when AM re-registers after a failure.
490+
* Visible for testing only.
490491
* */
491-
protected def reset(): Unit = {
492+
protected[scheduler] def reset(): Unit = {
492493
val executors: Set[String] = synchronized {
493494
requestedTotalExecutors = 0
494495
numPendingExecutors = 0
495-
executorsPendingToRemove.clear()
496496
executorDataMap.keys.toSet
497497
}
498498

core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
9494
// only refresh if it's changed OR after 1 minute (or the ssh connection will be closed
9595
// after idle some time)
9696
if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) {
97-
System.err.print(CR + bar)
97+
System.err.print(CR + bar + CR)
9898
lastUpdateTime = now
9999
}
100100
lastProgressBar = bar

core/src/main/scala/org/apache/spark/util/SizeEstimator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ object SizeEstimator extends Logging {
326326
val parent = getClassInfo(cls.getSuperclass)
327327
var shellSize = parent.shellSize
328328
var pointerFields = parent.pointerFields
329-
val sizeCount = Array.fill(fieldSizes.max + 1)(0)
329+
val sizeCount = Array.ofDim[Int](fieldSizes.max + 1)
330330

331331
// iterate through the fields of this class and gather information.
332332
for (field <- cls.getDeclaredFields) {

core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,12 @@ public void setUp() throws IOException {
4848
}
4949

5050
@After
51-
public void tearDown() {
51+
public void tearDown() throws IOException {
5252
inputFile.delete();
53+
54+
for (InputStream is : inputStreams) {
55+
is.close();
56+
}
5357
}
5458

5559
@Test
@@ -141,4 +145,15 @@ public void testBytesSkippedAfterEOF() throws IOException {
141145
assertEquals(-1, inputStream.read());
142146
}
143147
}
148+
149+
@Test
150+
public void testReadPastEOF() throws IOException {
151+
InputStream is = inputStreams[0];
152+
byte[] buf = new byte[1024];
153+
int read;
154+
while ((read = is.read(buf, 0, buf.length)) != -1);
155+
156+
int readAfterEOF = is.read(buf, 0, buf.length);
157+
assertEquals(-1, readAfterEOF);
158+
}
144159
}

core/src/test/scala/org/apache/spark/SparkConfSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,19 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
457457
assert(taskResourceRequirement.get(FPGA).isEmpty)
458458
}
459459

460+
test("test task resource requirement with 0 amount") {
461+
val conf = new SparkConf()
462+
conf.set(TASK_GPU_ID.amountConf, "2")
463+
conf.set(TASK_FPGA_ID.amountConf, "0")
464+
var taskResourceRequirement =
465+
parseResourceRequirements(conf, SPARK_TASK_PREFIX)
466+
.map(req => (req.resourceName, req.amount)).toMap
467+
468+
assert(taskResourceRequirement.size == 1)
469+
assert(taskResourceRequirement(GPU) == 2)
470+
}
471+
472+
460473
test("Ensure that we can configure fractional resources for a task") {
461474
val ratioSlots = Seq(
462475
(0.10, 10), (0.11, 9), (0.125, 8), (0.14, 7), (0.16, 6),

core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,20 @@ class ResourceUtilsSuite extends SparkFunSuite
6060
}
6161
}
6262

63+
test("Resource discoverer amount 0") {
64+
val conf = new SparkConf
65+
assume(!(Utils.isWindows))
66+
withTempDir { dir =>
67+
val scriptPath = createTempScriptWithExpectedOutput(dir, "gpuDiscoverScript",
68+
"""{"name": "gpu"}""")
69+
conf.set(EXECUTOR_GPU_ID.amountConf, "0")
70+
conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, scriptPath)
71+
72+
val res = getOrDiscoverAllResources(conf, SPARK_EXECUTOR_PREFIX, None)
73+
assert(res.isEmpty)
74+
}
75+
}
76+
6377
test("Resource discoverer multiple resource types") {
6478
val conf = new SparkConf
6579
assume(!(Utils.isWindows))
@@ -74,6 +88,13 @@ class ResourceUtilsSuite extends SparkFunSuite
7488
conf.set(EXECUTOR_FPGA_ID.amountConf, "2")
7589
conf.set(EXECUTOR_FPGA_ID.discoveryScriptConf, fpgaDiscovery)
7690

91+
// test one with amount 0 to make sure ignored
92+
val fooDiscovery = createTempScriptWithExpectedOutput(dir, "fooDiscoverScript",
93+
"""{"name": "foo", "addresses": ["f1", "f2", "f3"]}""")
94+
val fooId = ResourceID(SPARK_EXECUTOR_PREFIX, "foo")
95+
conf.set(fooId.amountConf, "0")
96+
conf.set(fooId.discoveryScriptConf, fooDiscovery)
97+
7798
val resources = getOrDiscoverAllResources(conf, SPARK_EXECUTOR_PREFIX, None)
7899
assert(resources.size === 2)
79100
val gpuValue = resources.get(GPU)

0 commit comments

Comments
 (0)