Skip to content

Commit 78c3961

Browse files
committed
Merge remote-tracking branch 'origin/master' into thread-local-date-format
# Conflicts: # sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
2 parents 5ae58a3 + a927c76 commit 78c3961

File tree

140 files changed

+853
-619
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

140 files changed

+853
-619
lines changed

common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,10 +347,10 @@ public void testRpcHandlerDelegate() throws Exception {
347347
verify(handler).getStreamManager();
348348

349349
saslHandler.channelInactive(null);
350-
verify(handler).channelInactive(any(TransportClient.class));
350+
verify(handler).channelInactive(isNull());
351351

352352
saslHandler.exceptionCaught(null, null);
353-
verify(handler).exceptionCaught(any(Throwable.class), any(TransportClient.class));
353+
verify(handler).exceptionCaught(isNull(), isNull());
354354
}
355355

356356
@Test

common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.network.util;
1919

20-
import java.nio.ByteBuffer;
2120
import java.util.ArrayList;
2221
import java.util.List;
2322
import java.util.Random;
@@ -69,7 +68,7 @@ public void testInterception() throws Exception {
6968
decoder.channelRead(ctx, len);
7069
decoder.channelRead(ctx, dataBuf);
7170
verify(interceptor, times(interceptedReads)).handle(any(ByteBuf.class));
72-
verify(ctx).fireChannelRead(any(ByteBuffer.class));
71+
verify(ctx).fireChannelRead(any(ByteBuf.class));
7372
assertEquals(0, len.refCnt());
7473
assertEquals(0, dataBuf.refCnt());
7574
} finally {

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.mockito.ArgumentCaptor;
2828

2929
import static org.junit.Assert.*;
30-
import static org.mockito.Matchers.any;
30+
import static org.mockito.ArgumentMatchers.any;
3131
import static org.mockito.Mockito.*;
3232

3333
import org.apache.spark.network.buffer.ManagedBuffer;
@@ -79,6 +79,8 @@ public void testRegisterExecutor() {
7979
@SuppressWarnings("unchecked")
8080
@Test
8181
public void testOpenShuffleBlocks() {
82+
when(client.getClientId()).thenReturn("app0");
83+
8284
RpcResponseCallback callback = mock(RpcResponseCallback.class);
8385

8486
ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828

2929
import static org.junit.Assert.assertEquals;
3030
import static org.junit.Assert.fail;
31-
import static org.mockito.Matchers.any;
32-
import static org.mockito.Matchers.anyInt;
33-
import static org.mockito.Matchers.anyLong;
34-
import static org.mockito.Matchers.eq;
31+
import static org.mockito.ArgumentMatchers.any;
32+
import static org.mockito.ArgumentMatchers.anyInt;
33+
import static org.mockito.ArgumentMatchers.anyLong;
34+
import static org.mockito.ArgumentMatchers.eq;
3535
import static org.mockito.Mockito.doAnswer;
3636
import static org.mockito.Mockito.mock;
3737
import static org.mockito.Mockito.times;

common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -209,22 +209,33 @@ public static long reallocateMemory(long address, long oldSize, long newSize) {
209209
}
210210

211211
/**
212-
* Uses internal JDK APIs to allocate a DirectByteBuffer while ignoring the JVM's
213-
* MaxDirectMemorySize limit (the default limit is too low and we do not want to require users
214-
* to increase it).
212+
* Allocate a DirectByteBuffer, potentially bypassing the JVM's MaxDirectMemorySize limit.
215213
*/
216214
public static ByteBuffer allocateDirectBuffer(int size) {
217215
try {
218-
long memory = allocateMemory(size);
219-
ByteBuffer buffer = (ByteBuffer) DBB_CONSTRUCTOR.newInstance(memory, size);
220-
if (CLEANER_CREATE_METHOD != null) {
216+
if (CLEANER_CREATE_METHOD == null) {
217+
// Can't set a Cleaner (see comments on field), so need to allocate via normal Java APIs
221218
try {
222-
DBB_CLEANER_FIELD.set(buffer,
223-
CLEANER_CREATE_METHOD.invoke(null, buffer, (Runnable) () -> freeMemory(memory)));
224-
} catch (IllegalAccessException | InvocationTargetException e) {
225-
throw new IllegalStateException(e);
219+
return ByteBuffer.allocateDirect(size);
220+
} catch (OutOfMemoryError oome) {
221+
// checkstyle.off: RegexpSinglelineJava
222+
throw new OutOfMemoryError("Failed to allocate direct buffer (" + oome.getMessage() +
223+
"); try increasing -XX:MaxDirectMemorySize=... to, for example, your heap size");
224+
// checkstyle.on: RegexpSinglelineJava
226225
}
227226
}
227+
// Otherwise, use internal JDK APIs to allocate a DirectByteBuffer while ignoring the JVM's
228+
// MaxDirectMemorySize limit (the default limit is too low and we do not want to
229+
// require users to increase it).
230+
long memory = allocateMemory(size);
231+
ByteBuffer buffer = (ByteBuffer) DBB_CONSTRUCTOR.newInstance(memory, size);
232+
try {
233+
DBB_CLEANER_FIELD.set(buffer,
234+
CLEANER_CREATE_METHOD.invoke(null, buffer, (Runnable) () -> freeMemory(memory)));
235+
} catch (IllegalAccessException | InvocationTargetException e) {
236+
freeMemory(memory);
237+
throw new IllegalStateException(e);
238+
}
228239
return buffer;
229240
} catch (Exception e) {
230241
throwException(e);

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ private[spark] class ExecutorAllocationManager(
127127
// allocation is only supported for YARN and the default number of cores per executor in YARN is
128128
// 1, but it might need to be attained differently for different cluster managers
129129
private val tasksPerExecutorForFullParallelism =
130-
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
130+
conf.get(EXECUTOR_CORES) / conf.getInt("spark.task.cpus", 1)
131131

132132
private val executorAllocationRatio =
133133
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
@@ -223,7 +223,7 @@ private[spark] class ExecutorAllocationManager(
223223
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
224224
}
225225
if (tasksPerExecutorForFullParallelism == 0) {
226-
throw new SparkException("spark.executor.cores must not be < spark.task.cpus.")
226+
throw new SparkException(s"${EXECUTOR_CORES.key} must not be < spark.task.cpus.")
227227
}
228228

229229
if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -503,12 +503,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
503503
logWarning(msg)
504504
}
505505

506-
val executorOptsKey = "spark.executor.extraJavaOptions"
507-
val executorClasspathKey = "spark.executor.extraClassPath"
508-
val driverOptsKey = "spark.driver.extraJavaOptions"
509-
val driverClassPathKey = "spark.driver.extraClassPath"
510-
val driverLibraryPathKey = "spark.driver.extraLibraryPath"
511-
val sparkExecutorInstances = "spark.executor.instances"
506+
val executorOptsKey = EXECUTOR_JAVA_OPTIONS.key
512507

513508
// Used by Yarn in 1.1 and before
514509
sys.props.get("spark.driver.libraryPath").foreach { value =>
@@ -517,7 +512,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
517512
|spark.driver.libraryPath was detected (set to '$value').
518513
|This is deprecated in Spark 1.2+.
519514
|
520-
|Please instead use: $driverLibraryPathKey
515+
|Please instead use: ${DRIVER_LIBRARY_PATH.key}
521516
""".stripMargin
522517
logWarning(warning)
523518
}
@@ -594,9 +589,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
594589
}
595590
}
596591

597-
if (contains("spark.cores.max") && contains("spark.executor.cores")) {
598-
val totalCores = getInt("spark.cores.max", 1)
599-
val executorCores = getInt("spark.executor.cores", 1)
592+
if (contains(CORES_MAX) && contains(EXECUTOR_CORES)) {
593+
val totalCores = getInt(CORES_MAX.key, 1)
594+
val executorCores = get(EXECUTOR_CORES)
600595
val leftCores = totalCores % executorCores
601596
if (leftCores != 0) {
602597
logWarning(s"Total executor cores: ${totalCores} is not " +
@@ -605,12 +600,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
605600
}
606601
}
607602

608-
if (contains("spark.executor.cores") && contains("spark.task.cpus")) {
609-
val executorCores = getInt("spark.executor.cores", 1)
603+
if (contains(EXECUTOR_CORES) && contains("spark.task.cpus")) {
604+
val executorCores = get(EXECUTOR_CORES)
610605
val taskCpus = getInt("spark.task.cpus", 1)
611606

612607
if (executorCores < taskCpus) {
613-
throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.")
608+
throw new SparkException(s"${EXECUTOR_CORES.key} must not be less than spark.task.cpus.")
614609
}
615610
}
616611

@@ -680,7 +675,7 @@ private[spark] object SparkConf extends Logging {
680675
* TODO: consolidate it with `ConfigBuilder.withAlternative`.
681676
*/
682677
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
683-
"spark.executor.userClassPathFirst" -> Seq(
678+
EXECUTOR_USER_CLASS_PATH_FIRST.key -> Seq(
684679
AlternateConfig("spark.files.userClassPathFirst", "1.3")),
685680
UPDATE_INTERVAL_S.key -> Seq(
686681
AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
@@ -703,7 +698,7 @@ private[spark] object SparkConf extends Logging {
703698
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
704699
"spark.shuffle.file.buffer" -> Seq(
705700
AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),
706-
"spark.executor.logs.rolling.maxSize" -> Seq(
701+
EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> Seq(
707702
AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),
708703
"spark.io.compression.snappy.blockSize" -> Seq(
709704
AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -386,9 +386,9 @@ class SparkContext(config: SparkConf) extends Logging {
386386
// Set Spark driver host and port system properties. This explicitly sets the configuration
387387
// instead of relying on the default value of the config constant.
388388
_conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
389-
_conf.setIfMissing("spark.driver.port", "0")
389+
_conf.setIfMissing(DRIVER_PORT, 0)
390390

391-
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
391+
_conf.set(EXECUTOR_ID, SparkContext.DRIVER_IDENTIFIER)
392392

393393
_jars = Utils.getUserJars(_conf)
394394
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
@@ -461,7 +461,7 @@ class SparkContext(config: SparkConf) extends Logging {
461461
files.foreach(addFile)
462462
}
463463

464-
_executorMemory = _conf.getOption("spark.executor.memory")
464+
_executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
465465
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
466466
.orElse(Option(System.getenv("SPARK_MEM"))
467467
.map(warnSparkMem))
@@ -2639,7 +2639,7 @@ object SparkContext extends Logging {
26392639
case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
26402640
case "yarn" =>
26412641
if (conf != null && conf.getOption("spark.submit.deployMode").contains("cluster")) {
2642-
conf.getInt("spark.driver.cores", 0)
2642+
conf.getInt(DRIVER_CORES.key, 0)
26432643
} else {
26442644
0
26452645
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,10 @@ object SparkEnv extends Logging {
163163
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
164164
assert(conf.contains(DRIVER_HOST_ADDRESS),
165165
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
166-
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
166+
assert(conf.contains(DRIVER_PORT), s"${DRIVER_PORT.key} is not set on the driver!")
167167
val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
168168
val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
169-
val port = conf.get("spark.driver.port").toInt
169+
val port = conf.get(DRIVER_PORT)
170170
val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
171171
Some(CryptoStreamUtils.createKey(conf))
172172
} else {
@@ -251,7 +251,7 @@ object SparkEnv extends Logging {
251251

252252
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
253253
if (isDriver) {
254-
conf.set("spark.driver.port", rpcEnv.address.port.toString)
254+
conf.set(DRIVER_PORT, rpcEnv.address.port)
255255
}
256256

257257
// Create an instance of the class with the given name, possibly initializing it with our conf
@@ -359,7 +359,7 @@ object SparkEnv extends Logging {
359359
// We need to set the executor ID before the MetricsSystem is created because sources and
360360
// sinks specified in the metrics configuration file will want to incorporate this executor's
361361
// ID into the metrics they report.
362-
conf.set("spark.executor.id", executorId)
362+
conf.set(EXECUTOR_ID, executorId)
363363
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
364364
ms.start()
365365
ms

core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import scala.collection.JavaConverters._
2727

2828
import org.apache.spark._
2929
import org.apache.spark.internal.Logging
30-
import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY
30+
import org.apache.spark.internal.config.EXECUTOR_CORES
31+
import org.apache.spark.internal.config.Python._
3132
import org.apache.spark.security.SocketAuthHelper
3233
import org.apache.spark.util._
3334

@@ -71,11 +72,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
7172

7273
private val conf = SparkEnv.get.conf
7374
private val bufferSize = conf.getInt("spark.buffer.size", 65536)
74-
private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
75+
private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
7576
// each python worker gets an equal part of the allocation. the worker pool will grow to the
7677
// number of concurrent tasks, which is determined by the number of cores in this executor.
77-
private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
78-
.map(_ / conf.getInt("spark.executor.cores", 1))
78+
private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY).map(_ / conf.get(EXECUTOR_CORES))
7979

8080
// All the Python functions should have the same exec, version and envvars.
8181
protected val envVars = funcs.head.funcs.head.envVars
@@ -496,7 +496,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
496496
extends Thread(s"Worker Monitor for $pythonExec") {
497497

498498
/** How long to wait before killing the python worker if a task cannot be interrupted. */
499-
private val taskKillTimeout = env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s")
499+
private val taskKillTimeout = env.conf.get(PYTHON_TASK_KILL_TIMEOUT)
500500

501501
setDaemon(true)
502502

0 commit comments

Comments
 (0)