Skip to content

Commit 112ae8f

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-8372
2 parents 7b91b74 + a48e619 commit 112ae8f

File tree

113 files changed

+2739
-1344
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

113 files changed

+2739
-1344
lines changed

LICENSE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -948,6 +948,6 @@ The following components are provided under the MIT License. See project link fo
948948
(MIT License) SLF4J LOG4J-12 Binding (org.slf4j:slf4j-log4j12:1.7.5 - http://www.slf4j.org)
949949
(MIT License) pyrolite (org.spark-project:pyrolite:2.0.1 - http://pythonhosted.org/Pyro4/)
950950
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
951-
(The MIT License) Mockito (org.mockito:mockito-core:1.8.5 - http://www.mockito.org)
951+
(The MIT License) Mockito (org.mockito:mockito-core:1.9.5 - http://www.mockito.org)
952952
(MIT License) jquery (https://jquery.org/license/)
953953
(MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs)

R/pkg/R/DataFrame.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ setMethod("isLocal",
169169
#'}
170170
setMethod("showDF",
171171
signature(x = "DataFrame"),
172-
function(x, numRows = 20) {
173-
s <- callJMethod(x@sdf, "showString", numToInt(numRows))
172+
function(x, numRows = 20, truncate = TRUE) {
173+
s <- callJMethod(x@sdf, "showString", numToInt(numRows), truncate)
174174
cat(s)
175175
})
176176

core/pom.xml

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,6 @@
6969
<dependency>
7070
<groupId>org.apache.hadoop</groupId>
7171
<artifactId>hadoop-client</artifactId>
72-
<exclusions>
73-
<exclusion>
74-
<groupId>javax.servlet</groupId>
75-
<artifactId>servlet-api</artifactId>
76-
</exclusion>
77-
<exclusion>
78-
<groupId>org.codehaus.jackson</groupId>
79-
<artifactId>jackson-mapper-asl</artifactId>
80-
</exclusion>
81-
</exclusions>
8272
</dependency>
8373
<dependency>
8474
<groupId>org.apache.spark</groupId>

core/src/main/scala/org/apache/spark/SSLOptions.scala

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.spark
1919

20-
import java.io.File
20+
import java.io.{File, FileInputStream}
21+
import java.security.{KeyStore, NoSuchAlgorithmException}
22+
import javax.net.ssl.{KeyManager, KeyManagerFactory, SSLContext, TrustManager, TrustManagerFactory}
2123

2224
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
2325
import org.eclipse.jetty.util.ssl.SslContextFactory
@@ -38,7 +40,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
3840
* @param trustStore a path to the trust-store file
3941
* @param trustStorePassword a password to access the trust-store file
4042
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
41-
* @param enabledAlgorithms a set of encryption algorithms to use
43+
* @param enabledAlgorithms a set of encryption algorithms that may be used
4244
*/
4345
private[spark] case class SSLOptions(
4446
enabled: Boolean = false,
@@ -48,7 +50,8 @@ private[spark] case class SSLOptions(
4850
trustStore: Option[File] = None,
4951
trustStorePassword: Option[String] = None,
5052
protocol: Option[String] = None,
51-
enabledAlgorithms: Set[String] = Set.empty) {
53+
enabledAlgorithms: Set[String] = Set.empty)
54+
extends Logging {
5255

5356
/**
5457
* Creates a Jetty SSL context factory according to the SSL settings represented by this object.
@@ -63,7 +66,7 @@ private[spark] case class SSLOptions(
6366
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
6467
keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
6568
protocol.foreach(sslContextFactory.setProtocol)
66-
sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*)
69+
sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*)
6770

6871
Some(sslContextFactory)
6972
} else {
@@ -94,14 +97,44 @@ private[spark] case class SSLOptions(
9497
.withValue("akka.remote.netty.tcp.security.protocol",
9598
ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
9699
.withValue("akka.remote.netty.tcp.security.enabled-algorithms",
97-
ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq))
100+
ConfigValueFactory.fromIterable(supportedAlgorithms.toSeq))
98101
.withValue("akka.remote.netty.tcp.enable-ssl",
99102
ConfigValueFactory.fromAnyRef(true)))
100103
} else {
101104
None
102105
}
103106
}
104107

108+
/*
109+
* The supportedAlgorithms set is a subset of the enabledAlgorithms that
110+
* are supported by the current Java security provider for this protocol.
111+
*/
112+
private val supportedAlgorithms: Set[String] = {
113+
var context: SSLContext = null
114+
try {
115+
context = SSLContext.getInstance(protocol.orNull)
116+
/* The set of supported algorithms does not depend upon the keys, trust, or
117+
rng, although they will influence which algorithms are eventually used. */
118+
context.init(null, null, null)
119+
} catch {
120+
case npe: NullPointerException =>
121+
logDebug("No SSL protocol specified")
122+
context = SSLContext.getDefault
123+
case nsa: NoSuchAlgorithmException =>
124+
logDebug(s"No support for requested SSL protocol ${protocol.get}")
125+
context = SSLContext.getDefault
126+
}
127+
128+
val providerAlgorithms = context.getServerSocketFactory.getSupportedCipherSuites.toSet
129+
130+
// Log which algorithms we are discarding
131+
(enabledAlgorithms &~ providerAlgorithms).foreach { cipher =>
132+
logDebug(s"Discarding unsupported cipher $cipher")
133+
}
134+
135+
enabledAlgorithms & providerAlgorithms
136+
}
137+
105138
/** Returns a string representation of this SSLOptions with all the passwords masked. */
106139
override def toString: String = s"SSLOptions{enabled=$enabled, " +
107140
s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
315315
_dagScheduler = ds
316316
}
317317

318+
/**
319+
* A unique identifier for the Spark application.
320+
* Its format depends on the scheduler implementation.
321+
* (i.e.
322+
* in case of local spark app something like 'local-1433865536131'
323+
* in case of YARN something like 'application_1433865536131_34483'
324+
* )
325+
*/
318326
def applicationId: String = _applicationId
319327
def applicationAttemptId: Option[String] = _applicationAttemptId
320328

core/src/main/scala/org/apache/spark/api/r/RRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ private[r] object RRDD {
391391
}
392392

393393
private def createRProcess(rLibDir: String, port: Int, script: String): BufferedStreamThread = {
394-
val rCommand = "Rscript"
394+
val rCommand = SparkEnv.get.conf.get("spark.sparkr.r.command", "Rscript")
395395
val rOptions = "--vanilla"
396396
val rExecScript = rLibDir + "/SparkR/worker/" + script
397397
val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript))

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,20 @@ private[spark] object SparkSubmitUtils {
756756
val cr = new ChainResolver
757757
cr.setName("list")
758758

759+
val repositoryList = remoteRepos.getOrElse("")
760+
// add any other remote repositories other than maven central
761+
if (repositoryList.trim.nonEmpty) {
762+
repositoryList.split(",").zipWithIndex.foreach { case (repo, i) =>
763+
val brr: IBiblioResolver = new IBiblioResolver
764+
brr.setM2compatible(true)
765+
brr.setUsepoms(true)
766+
brr.setRoot(repo)
767+
brr.setName(s"repo-${i + 1}")
768+
cr.add(brr)
769+
printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
770+
}
771+
}
772+
759773
val localM2 = new IBiblioResolver
760774
localM2.setM2compatible(true)
761775
localM2.setRoot(m2Path.toURI.toString)
@@ -786,20 +800,6 @@ private[spark] object SparkSubmitUtils {
786800
sp.setRoot("http://dl.bintray.com/spark-packages/maven")
787801
sp.setName("spark-packages")
788802
cr.add(sp)
789-
790-
val repositoryList = remoteRepos.getOrElse("")
791-
// add any other remote repositories other than maven central
792-
if (repositoryList.trim.nonEmpty) {
793-
repositoryList.split(",").zipWithIndex.foreach { case (repo, i) =>
794-
val brr: IBiblioResolver = new IBiblioResolver
795-
brr.setM2compatible(true)
796-
brr.setUsepoms(true)
797-
brr.setRoot(repo)
798-
brr.setName(s"repo-${i + 1}")
799-
cr.add(brr)
800-
printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
801-
}
802-
}
803803
cr
804804
}
805805

@@ -922,6 +922,15 @@ private[spark] object SparkSubmitUtils {
922922

923923
// A Module descriptor must be specified. Entries are dummy strings
924924
val md = getModuleDescriptor
925+
// clear ivy resolution from previous launches. The resolution file is usually at
926+
// ~/.ivy2/org.apache.spark-spark-submit-parent-default.xml. In between runs, this file
927+
// leads to confusion with Ivy when the files can no longer be found at the repository
928+
// declared in that file/
929+
val mdId = md.getModuleRevisionId
930+
val previousResolution = new File(ivySettings.getDefaultCache,
931+
s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml")
932+
if (previousResolution.exists) previousResolution.delete
933+
925934
md.setDefaultConf(ivyConfName)
926935

927936
// Add exclusion rules for Spark and Scala Library

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2333,3 +2333,36 @@ private[spark] class RedirectThread(
23332333
}
23342334
}
23352335
}
2336+
2337+
/**
2338+
* An [[OutputStream]] that will store the last 10 kilobytes (by default) written to it
2339+
* in a circular buffer. The current contents of the buffer can be accessed using
2340+
* the toString method.
2341+
*/
2342+
private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream {
2343+
var pos: Int = 0
2344+
var buffer = new Array[Int](sizeInBytes)
2345+
2346+
def write(i: Int): Unit = {
2347+
buffer(pos) = i
2348+
pos = (pos + 1) % buffer.length
2349+
}
2350+
2351+
override def toString: String = {
2352+
val (end, start) = buffer.splitAt(pos)
2353+
val input = new java.io.InputStream {
2354+
val iterator = (start ++ end).iterator
2355+
2356+
def read(): Int = if (iterator.hasNext) iterator.next() else -1
2357+
}
2358+
val reader = new BufferedReader(new InputStreamReader(input))
2359+
val stringBuilder = new StringBuilder
2360+
var line = reader.readLine()
2361+
while (line != null) {
2362+
stringBuilder.append(line)
2363+
stringBuilder.append("\n")
2364+
line = reader.readLine()
2365+
}
2366+
stringBuilder.toString()
2367+
}
2368+
}

core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark
1919

2020
import java.io.File
21+
import javax.net.ssl.SSLContext
2122

2223
import com.google.common.io.Files
2324
import org.apache.spark.util.Utils
@@ -29,16 +30,24 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
2930
val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
3031
val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
3132

33+
// Pick two cipher suites that the provider knows about
34+
val sslContext = SSLContext.getInstance("TLSv1.2")
35+
sslContext.init(null, null, null)
36+
val algorithms = sslContext
37+
.getServerSocketFactory
38+
.getDefaultCipherSuites
39+
.take(2)
40+
.toSet
41+
3242
val conf = new SparkConf
3343
conf.set("spark.ssl.enabled", "true")
3444
conf.set("spark.ssl.keyStore", keyStorePath)
3545
conf.set("spark.ssl.keyStorePassword", "password")
3646
conf.set("spark.ssl.keyPassword", "password")
3747
conf.set("spark.ssl.trustStore", trustStorePath)
3848
conf.set("spark.ssl.trustStorePassword", "password")
39-
conf.set("spark.ssl.enabledAlgorithms",
40-
"TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
41-
conf.set("spark.ssl.protocol", "SSLv3")
49+
conf.set("spark.ssl.enabledAlgorithms", algorithms.mkString(","))
50+
conf.set("spark.ssl.protocol", "TLSv1.2")
4251

4352
val opts = SSLOptions.parse(conf, "spark.ssl")
4453

@@ -52,9 +61,8 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
5261
assert(opts.trustStorePassword === Some("password"))
5362
assert(opts.keyStorePassword === Some("password"))
5463
assert(opts.keyPassword === Some("password"))
55-
assert(opts.protocol === Some("SSLv3"))
56-
assert(opts.enabledAlgorithms ===
57-
Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
64+
assert(opts.protocol === Some("TLSv1.2"))
65+
assert(opts.enabledAlgorithms === algorithms)
5866
}
5967

6068
test("test resolving property with defaults specified ") {

core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,20 @@ object SSLSampleConfigs {
2525
this.getClass.getResource("/untrusted-keystore").toURI).getAbsolutePath
2626
val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
2727

28+
val enabledAlgorithms =
29+
// A reasonable set of TLSv1.2 Oracle security provider suites
30+
"TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384, " +
31+
"TLS_RSA_WITH_AES_256_CBC_SHA256, " +
32+
"TLS_DHE_RSA_WITH_AES_256_CBC_SHA256, " +
33+
"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256, " +
34+
"TLS_DHE_RSA_WITH_AES_128_CBC_SHA256, " +
35+
// and their equivalent names in the IBM Security provider
36+
"SSL_ECDHE_RSA_WITH_AES_256_CBC_SHA384, " +
37+
"SSL_RSA_WITH_AES_256_CBC_SHA256, " +
38+
"SSL_DHE_RSA_WITH_AES_256_CBC_SHA256, " +
39+
"SSL_ECDHE_RSA_WITH_AES_128_CBC_SHA256, " +
40+
"SSL_DHE_RSA_WITH_AES_128_CBC_SHA256"
41+
2842
def sparkSSLConfig(): SparkConf = {
2943
val conf = new SparkConf(loadDefaults = false)
3044
conf.set("spark.ssl.enabled", "true")
@@ -33,9 +47,8 @@ object SSLSampleConfigs {
3347
conf.set("spark.ssl.keyPassword", "password")
3448
conf.set("spark.ssl.trustStore", trustStorePath)
3549
conf.set("spark.ssl.trustStorePassword", "password")
36-
conf.set("spark.ssl.enabledAlgorithms",
37-
"SSL_RSA_WITH_RC4_128_SHA, SSL_RSA_WITH_DES_CBC_SHA")
38-
conf.set("spark.ssl.protocol", "TLSv1")
50+
conf.set("spark.ssl.enabledAlgorithms", enabledAlgorithms)
51+
conf.set("spark.ssl.protocol", "TLSv1.2")
3952
conf
4053
}
4154

@@ -47,9 +60,8 @@ object SSLSampleConfigs {
4760
conf.set("spark.ssl.keyPassword", "password")
4861
conf.set("spark.ssl.trustStore", trustStorePath)
4962
conf.set("spark.ssl.trustStorePassword", "password")
50-
conf.set("spark.ssl.enabledAlgorithms",
51-
"SSL_RSA_WITH_RC4_128_SHA, SSL_RSA_WITH_DES_CBC_SHA")
52-
conf.set("spark.ssl.protocol", "TLSv1")
63+
conf.set("spark.ssl.enabledAlgorithms", enabledAlgorithms)
64+
conf.set("spark.ssl.protocol", "TLSv1.2")
5365
conf
5466
}
5567

0 commit comments

Comments
 (0)