Skip to content

Commit db46cf7

Browse files
author
pgandhi
committed
Merge branch 'master' of https://github.com/pgandhi999/spark into SPARK-27207
[SPARK-27207] : Upmerging with master branch
2 parents 8f5c6b0 + 25ee047 commit db46cf7

File tree

397 files changed

+7399
-3068
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

397 files changed

+7399
-3068
lines changed

LICENSE-binary

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,6 @@ com.google.code.gson:gson
302302
com.google.inject:guice
303303
com.google.inject.extensions:guice-servlet
304304
com.twitter:parquet-hadoop-bundle
305-
commons-beanutils:commons-beanutils-core
306305
commons-cli:commons-cli
307306
commons-dbcp:commons-dbcp
308307
commons-io:commons-io

R/pkg/R/functions.R

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3589,6 +3589,8 @@ setMethod("element_at",
35893589

35903590
#' @details
35913591
#' \code{explode}: Creates a new row for each element in the given array or map column.
3592+
#' Uses the default column name \code{col} for elements in the array and
3593+
#' \code{key} and \code{value} for elements in the map unless specified otherwise.
35923594
#'
35933595
#' @rdname column_collection_functions
35943596
#' @aliases explode explode,Column-method
@@ -3649,7 +3651,9 @@ setMethod("sort_array",
36493651

36503652
#' @details
36513653
#' \code{posexplode}: Creates a new row for each element with position in the given array
3652-
#' or map column.
3654+
#' or map column. Uses the default column name \code{pos} for position, and \code{col}
3655+
#' for elements in the array and \code{key} and \code{value} for elements in the map
3656+
#' unless specified otherwise.
36533657
#'
36543658
#' @rdname column_collection_functions
36553659
#' @aliases posexplode posexplode,Column-method
@@ -3790,7 +3794,8 @@ setMethod("repeat_string",
37903794
#' \code{explode}: Creates a new row for each element in the given array or map column.
37913795
#' Unlike \code{explode}, if the array/map is \code{null} or empty
37923796
#' then \code{null} is produced.
3793-
#'
3797+
#' Uses the default column name \code{col} for elements in the array and
3798+
#' \code{key} and \code{value} for elements in the map unless specified otherwise.
37943799
#'
37953800
#' @rdname column_collection_functions
37963801
#' @aliases explode_outer explode_outer,Column-method
@@ -3815,6 +3820,9 @@ setMethod("explode_outer",
38153820
#' \code{posexplode_outer}: Creates a new row for each element with position in the given
38163821
#' array or map column. Unlike \code{posexplode}, if the array/map is \code{null} or empty
38173822
#' then the row (\code{null}, \code{null}) is produced.
3823+
#' Uses the default column name \code{pos} for position, and \code{col}
3824+
#' for elements in the array and \code{key} and \code{value} for elements in the map
3825+
#' unless specified otherwise.
38183826
#'
38193827
#' @rdname column_collection_functions
38203828
#' @aliases posexplode_outer posexplode_outer,Column-method

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@
4646
*/
4747
public class RetryingBlockFetcherSuite {
4848

49-
ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
50-
ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
51-
ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
49+
private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
50+
private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
51+
private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
5252

5353
@Test
5454
public void testNoFailures() throws IOException, InterruptedException {
@@ -291,7 +291,7 @@ private static void performInteractions(List<? extends Map<String, Object>> inte
291291
}
292292

293293
assertNotNull(stub);
294-
stub.when(fetchStarter).createAndStart(any(), anyObject());
294+
stub.when(fetchStarter).createAndStart(any(), any());
295295
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
296296
new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
297297
}

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@
347347
<dependency>
348348
<groupId>net.razorvine</groupId>
349349
<artifactId>pyrolite</artifactId>
350-
<version>4.13</version>
350+
<version>4.23</version>
351351
<exclusions>
352352
<exclusion>
353353
<groupId>net.razorvine</groupId>

core/src/main/scala/org/apache/spark/BarrierTaskContext.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark
2020
import java.util.{Properties, Timer, TimerTask}
2121

2222
import scala.concurrent.duration._
23-
import scala.language.postfixOps
2423

2524
import org.apache.spark.annotation.{Experimental, Since}
2625
import org.apache.spark.executor.TaskMetrics
@@ -122,7 +121,7 @@ class BarrierTaskContext private[spark] (
122121
barrierEpoch),
123122
// Set a fixed timeout for RPC here, so users shall get a SparkException thrown by
124123
// BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework.
125-
timeout = new RpcTimeout(31536000 /* = 3600 * 24 * 365 */ seconds, "barrierTimeout"))
124+
timeout = new RpcTimeout(365.days, "barrierTimeout"))
126125
barrierEpoch += 1
127126
logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " +
128127
"global sync successfully, waited for " +

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,20 @@ private[spark] object TestUtils {
192192
assert(listener.numSpilledStages == 0, s"expected $identifier to not spill, but did")
193193
}
194194

195+
/**
196+
* Asserts that exception message contains the message. Please note this checks all
197+
* exceptions in the tree.
198+
*/
199+
def assertExceptionMsg(exception: Throwable, msg: String): Unit = {
200+
var e = exception
201+
var contains = e.getMessage.contains(msg)
202+
while (e.getCause != null && !contains) {
203+
e = e.getCause
204+
contains = e.getMessage.contains(msg)
205+
}
206+
assert(contains, s"Exception tree doesn't contain the expected message: $msg")
207+
}
208+
195209
/**
196210
* Test if a command is available.
197211
*/

core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,17 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
3333
* A trait for use with reading custom classes in PySpark. Implement this trait and add custom
3434
* transformation code by overriding the convert method.
3535
*/
36-
trait Converter[T, + U] extends Serializable {
36+
trait Converter[-T, +U] extends Serializable {
3737
def convert(obj: T): U
3838
}
3939

4040
private[python] object Converter extends Logging {
4141

42-
def getInstance(converterClass: Option[String],
43-
defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
42+
def getInstance[T, U](converterClass: Option[String],
43+
defaultConverter: Converter[_ >: T, _ <: U]): Converter[T, U] = {
4444
converterClass.map { cc =>
4545
Try {
46-
val c = Utils.classForName(cc).getConstructor().
47-
newInstance().asInstanceOf[Converter[Any, Any]]
46+
val c = Utils.classForName[Converter[T, U]](cc).getConstructor().newInstance()
4847
logInfo(s"Loaded converter: $cc")
4948
c
5049
} match {
@@ -177,8 +176,8 @@ private[python] object PythonHadoopUtil {
177176
* [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice versa.
178177
*/
179178
def convertRDD[K, V](rdd: RDD[(K, V)],
180-
keyConverter: Converter[Any, Any],
181-
valueConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
179+
keyConverter: Converter[K, Any],
180+
valueConverter: Converter[V, Any]): RDD[(Any, Any)] = {
182181
rdd.map { case (k, v) => (keyConverter.convert(k), valueConverter.convert(v)) }
183182
}
184183

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@ import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
2424

2525
import scala.collection.JavaConverters._
2626
import scala.collection.mutable
27-
import scala.concurrent.Promise
28-
import scala.concurrent.duration.Duration
29-
import scala.language.existentials
30-
import scala.util.Try
3127

3228
import org.apache.hadoop.conf.Configuration
3329
import org.apache.hadoop.io.compress.CompressionCodec
@@ -228,8 +224,8 @@ private[spark] object PythonRDD extends Logging {
228224
batchSize: Int): JavaRDD[Array[Byte]] = {
229225
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
230226
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
231-
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
232-
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
227+
val kc = Utils.classForName[K](keyClass)
228+
val vc = Utils.classForName[V](valueClass)
233229
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
234230
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration()))
235231
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
@@ -296,9 +292,9 @@ private[spark] object PythonRDD extends Logging {
296292
keyClass: String,
297293
valueClass: String,
298294
conf: Configuration): RDD[(K, V)] = {
299-
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
300-
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
301-
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
295+
val kc = Utils.classForName[K](keyClass)
296+
val vc = Utils.classForName[V](valueClass)
297+
val fc = Utils.classForName[F](inputFormatClass)
302298
if (path.isDefined) {
303299
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
304300
} else {
@@ -365,9 +361,9 @@ private[spark] object PythonRDD extends Logging {
365361
keyClass: String,
366362
valueClass: String,
367363
conf: Configuration) = {
368-
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
369-
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
370-
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
364+
val kc = Utils.classForName[K](keyClass)
365+
val vc = Utils.classForName[V](valueClass)
366+
val fc = Utils.classForName[F](inputFormatClass)
371367
if (path.isDefined) {
372368
sc.sc.hadoopFile(path.get, fc, kc, vc)
373369
} else {
@@ -425,29 +421,33 @@ private[spark] object PythonRDD extends Logging {
425421
PythonHadoopUtil.mergeConfs(baseConf, conf)
426422
}
427423

428-
private def inferKeyValueTypes[K, V](rdd: RDD[(K, V)], keyConverterClass: String = null,
429-
valueConverterClass: String = null): (Class[_], Class[_]) = {
424+
private def inferKeyValueTypes[K, V, KK, VV](rdd: RDD[(K, V)], keyConverterClass: String = null,
425+
valueConverterClass: String = null): (Class[_ <: KK], Class[_ <: VV]) = {
430426
// Peek at an element to figure out key/value types. Since Writables are not serializable,
431427
// we cannot call first() on the converted RDD. Instead, we call first() on the original RDD
432428
// and then convert locally.
433429
val (key, value) = rdd.first()
434-
val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
435-
new JavaToWritableConverter)
430+
val (kc, vc) = getKeyValueConverters[K, V, KK, VV](
431+
keyConverterClass, valueConverterClass, new JavaToWritableConverter)
436432
(kc.convert(key).getClass, vc.convert(value).getClass)
437433
}
438434

439-
private def getKeyValueTypes(keyClass: String, valueClass: String):
440-
Option[(Class[_], Class[_])] = {
435+
private def getKeyValueTypes[K, V](keyClass: String, valueClass: String):
436+
Option[(Class[K], Class[V])] = {
441437
for {
442438
k <- Option(keyClass)
443439
v <- Option(valueClass)
444440
} yield (Utils.classForName(k), Utils.classForName(v))
445441
}
446442

447-
private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
448-
defaultConverter: Converter[Any, Any]): (Converter[Any, Any], Converter[Any, Any]) = {
449-
val keyConverter = Converter.getInstance(Option(keyConverterClass), defaultConverter)
450-
val valueConverter = Converter.getInstance(Option(valueConverterClass), defaultConverter)
443+
private def getKeyValueConverters[K, V, KK, VV](
444+
keyConverterClass: String,
445+
valueConverterClass: String,
446+
defaultConverter: Converter[_, _]): (Converter[K, KK], Converter[V, VV]) = {
447+
val keyConverter = Converter.getInstance(Option(keyConverterClass),
448+
defaultConverter.asInstanceOf[Converter[K, KK]])
449+
val valueConverter = Converter.getInstance(Option(valueConverterClass),
450+
defaultConverter.asInstanceOf[Converter[V, VV]])
451451
(keyConverter, valueConverter)
452452
}
453453

@@ -459,7 +459,7 @@ private[spark] object PythonRDD extends Logging {
459459
keyConverterClass: String,
460460
valueConverterClass: String,
461461
defaultConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
462-
val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
462+
val (kc, vc) = getKeyValueConverters[K, V, Any, Any](keyConverterClass, valueConverterClass,
463463
defaultConverter)
464464
PythonHadoopUtil.convertRDD(rdd, kc, vc)
465465
}
@@ -470,7 +470,7 @@ private[spark] object PythonRDD extends Logging {
470470
* [[org.apache.hadoop.io.Writable]] types already, since Writables are not Java
471471
* `Serializable` and we can't peek at them. The `path` can be on any Hadoop file system.
472472
*/
473-
def saveAsSequenceFile[K, V, C <: CompressionCodec](
473+
def saveAsSequenceFile[C <: CompressionCodec](
474474
pyRDD: JavaRDD[Array[Byte]],
475475
batchSerialized: Boolean,
476476
path: String,
@@ -489,7 +489,7 @@ private[spark] object PythonRDD extends Logging {
489489
* `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
490490
* this RDD.
491491
*/
492-
def saveAsHadoopFile[K, V, F <: OutputFormat[_, _], C <: CompressionCodec](
492+
def saveAsHadoopFile[F <: OutputFormat[_, _], C <: CompressionCodec](
493493
pyRDD: JavaRDD[Array[Byte]],
494494
batchSerialized: Boolean,
495495
path: String,
@@ -507,7 +507,7 @@ private[spark] object PythonRDD extends Logging {
507507
val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
508508
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
509509
new JavaToWritableConverter)
510-
val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
510+
val fc = Utils.classForName[F](outputFormatClass)
511511
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec = codec)
512512
}
513513

@@ -520,7 +520,7 @@ private[spark] object PythonRDD extends Logging {
520520
* `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
521521
* this RDD.
522522
*/
523-
def saveAsNewAPIHadoopFile[K, V, F <: NewOutputFormat[_, _]](
523+
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
524524
pyRDD: JavaRDD[Array[Byte]],
525525
batchSerialized: Boolean,
526526
path: String,
@@ -548,7 +548,7 @@ private[spark] object PythonRDD extends Logging {
548548
* (mapred vs. mapreduce). Keys/values are converted for output using either user specified
549549
* converters or, by default, [[org.apache.spark.api.python.JavaToWritableConverter]].
550550
*/
551-
def saveAsHadoopDataset[K, V](
551+
def saveAsHadoopDataset(
552552
pyRDD: JavaRDD[Array[Byte]],
553553
batchSerialized: Boolean,
554554
confAsMap: java.util.HashMap[String, String],

core/src/main/scala/org/apache/spark/api/r/RBackend.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder
3030
import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder}
3131
import io.netty.handler.timeout.ReadTimeoutHandler
3232

33-
import org.apache.spark.SparkConf
33+
import org.apache.spark.{SparkConf, SparkEnv}
3434
import org.apache.spark.internal.Logging
3535
import org.apache.spark.internal.config.R._
3636

@@ -47,7 +47,7 @@ private[spark] class RBackend {
4747
private[r] val jvmObjectTracker = new JVMObjectTracker
4848

4949
def init(): (Int, RAuthHelper) = {
50-
val conf = new SparkConf()
50+
val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
5151
val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
5252
bossGroup = new NioEventLoopGroup(conf.get(R_NUM_BACKEND_THREADS))
5353
val workerGroup = bossGroup
@@ -124,7 +124,7 @@ private[spark] object RBackend extends Logging {
124124
val listenPort = serverSocket.getLocalPort()
125125
// Connection timeout is set by socket client. To make it configurable we will pass the
126126
// timeout value to client inside the temp file
127-
val conf = new SparkConf()
127+
val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
128128
val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
129129

130130
// tell the R process via temporary file

core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@ package org.apache.spark.api.r
2020
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
2121
import java.util.concurrent.TimeUnit
2222

23-
import scala.language.existentials
24-
2523
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
2624
import io.netty.channel.ChannelHandler.Sharable
2725
import io.netty.handler.timeout.ReadTimeoutException
2826

29-
import org.apache.spark.SparkConf
27+
import org.apache.spark.{SparkConf, SparkEnv}
3028
import org.apache.spark.api.r.SerDe._
3129
import org.apache.spark.internal.Logging
3230
import org.apache.spark.internal.config.R._
@@ -98,7 +96,7 @@ private[r] class RBackendHandler(server: RBackend)
9896
ctx.write(pingBaos.toByteArray)
9997
}
10098
}
101-
val conf = new SparkConf()
99+
val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
102100
val heartBeatInterval = conf.get(R_HEARTBEAT_INTERVAL)
103101
val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
104102
val interval = Math.min(heartBeatInterval, backendConnectionTimeout - 1)

0 commit comments

Comments
 (0)