Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into improve_ts
Browse files Browse the repository at this point in the history
Conflicts:
	sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
  • Loading branch information
Davies Liu committed Jun 19, 2015
2 parents 602b969 + 2c59d5c commit ae5979c
Show file tree
Hide file tree
Showing 265 changed files with 4,908 additions and 2,084 deletions.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -950,3 +950,4 @@ The following components are provided under the MIT License. See project link fo
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
(The MIT License) Mockito (org.mockito:mockito-all:1.8.5 - http://www.mockito.org)
(MIT License) jquery (https://jquery.org/license/)
(MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs)
2 changes: 1 addition & 1 deletion R/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=R-unit-tests.log
log4j.appender.file.file=R/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

Expand Down
35 changes: 13 additions & 22 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ install_app() {

# Install maven under the build/ folder
install_mvn() {
local MVN_VERSION="3.3.3"

install_app \
"http://archive.apache.org/dist/maven/maven-3/3.2.5/binaries" \
"apache-maven-3.2.5-bin.tar.gz" \
"apache-maven-3.2.5/bin/mvn"
MVN_BIN="${_DIR}/apache-maven-3.2.5/bin/mvn"
"http://archive.apache.org/dist/maven/maven-3/${MVN_VERSION}/binaries" \
"apache-maven-${MVN_VERSION}-bin.tar.gz" \
"apache-maven-${MVN_VERSION}/bin/mvn"

MVN_BIN="${_DIR}/apache-maven-${MVN_VERSION}/bin/mvn"
}

# Install zinc under the build/ folder
Expand Down Expand Up @@ -105,28 +108,16 @@ install_scala() {
SCALA_LIBRARY="$(cd "$(dirname ${scala_bin})/../lib" && pwd)/scala-library.jar"
}

# Determines if a given application is already installed. If not, will attempt
# to install
## Arg1 - application name
## Arg2 - Alternate path to local install under build/ dir
check_and_install_app() {
# create the local environment variable in uppercase
local app_bin="`echo $1 | awk '{print toupper(\$0)}'`_BIN"
# some black magic to set the generated app variable (i.e. MVN_BIN) into the
# environment
eval "${app_bin}=`which $1 2>/dev/null`"

if [ -z "`which $1 2>/dev/null`" ]; then
install_$1
fi
}

# Setup healthy defaults for the Zinc port if none were provided from
# the environment
ZINC_PORT=${ZINC_PORT:-"3030"}

# Check and install all applications necessary to build Spark
check_and_install_app "mvn"
# Install Maven if necessary
MVN_BIN="$(command -v mvn)"

if [ ! "$MVN_BIN" ]; then
install_mvn
fi

# Install the proper version of Scala and Zinc for the build
install_zinc
Expand Down
17 changes: 14 additions & 3 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
// key used to store the spark secret in the Hadoop UGI
private val sparkSecretLookupKey = "sparkCookie"

private val authOn = sparkConf.getBoolean("spark.authenticate", false)
private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, false)
// keep spark.ui.acls.enable for backwards compatibility with 1.0
private var aclsOn =
sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))
Expand Down Expand Up @@ -365,10 +365,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
cookie
} else {
// user must have set spark.authenticate.secret config
sparkConf.getOption("spark.authenticate.secret") match {
// For Master/Worker, auth secret is in conf; for Executors, it is in env variable
sys.env.get(SecurityManager.ENV_AUTH_SECRET)
.orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {
case Some(value) => value
case None => throw new Exception("Error: a secret key must be specified via the " +
"spark.authenticate.secret config")
SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
}
}
sCookie
Expand Down Expand Up @@ -449,3 +451,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
override def getSaslUser(appId: String): String = getSaslUser()
override def getSecretKey(appId: String): String = getSecretKey()
}

private[spark] object SecurityManager {

val SPARK_AUTH_CONF: String = "spark.authenticate"
val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret"
// This is used to set auth secret to an executor's env variable. It should have the same
// value as SPARK_AUTH_SECERET_CONF set in SparkConf
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
val ow = new ObjectWritable()
ow.setConf(new Configuration())
ow.setConf(new Configuration(false))
ow.readFields(in)
t = ow.get().asInstanceOf[T]
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ private[spark] object SparkConf extends Logging {
def isExecutorStartupConf(name: String): Boolean = {
isAkkaConf(name) ||
name.startsWith("spark.akka") ||
name.startsWith("spark.auth") ||
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.ssl") ||
isSparkPortConf(name)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.util.SerializableJobConf

/**
* Internal helper class that saves an RDD using a Hadoop OutputFormat.
Expand All @@ -42,7 +43,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
with Serializable {

private val now = new Date()
private val conf = new SerializableWritable(jobConf)
private val conf = new SerializableJobConf(jobConf)

private var jobID = 0
private var splitID = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.api.python

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SerializableWritable, SparkException}
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.{Logging, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -61,7 +61,7 @@ private[python] object Converter extends Logging {
* Other objects are passed through without conversion.
*/
private[python] class WritableToJavaConverter(
conf: Broadcast[SerializableWritable[Configuration]]) extends Converter[Any, Any] {
conf: Broadcast[SerializableConfiguration]) extends Converter[Any, Any] {

/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.spark.util.{SerializableConfiguration, Utils}

import scala.util.control.NonFatal

Expand Down Expand Up @@ -445,7 +445,7 @@ private[spark] object PythonRDD extends Logging {
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand All @@ -471,7 +471,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand All @@ -497,7 +497,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand Down Expand Up @@ -540,7 +540,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand All @@ -566,7 +566,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand Down
55 changes: 36 additions & 19 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import org.apache.ivy.core.resolve.ResolveOptions
import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
Expand Down Expand Up @@ -735,8 +736,14 @@ private[spark] object SparkSubmitUtils {
}

/** Path of the local Maven cache. */
private[spark] def m2Path: File = new File(System.getProperty("user.home"),
".m2" + File.separator + "repository" + File.separator)
private[spark] def m2Path: File = {
if (Utils.isTesting) {
// test builds delete the maven cache, and this can cause flakiness
new File("dummy", ".m2" + File.separator + "repository")
} else {
new File(System.getProperty("user.home"), ".m2" + File.separator + "repository")
}
}

/**
* Extracts maven coordinates from a comma-delimited string
Expand All @@ -756,12 +763,13 @@ private[spark] object SparkSubmitUtils {
localM2.setName("local-m2-cache")
cr.add(localM2)

val localIvy = new IBiblioResolver
localIvy.setRoot(new File(ivySettings.getDefaultIvyUserDir,
"local" + File.separator).toURI.toString)
val localIvy = new FileSystemResolver
val localIvyRoot = new File(ivySettings.getDefaultIvyUserDir, "local")
localIvy.setLocal(true)
localIvy.setRepository(new FileRepository(localIvyRoot))
val ivyPattern = Seq("[organisation]", "[module]", "[revision]", "[type]s",
"[artifact](-[classifier]).[ext]").mkString(File.separator)
localIvy.setPattern(ivyPattern)
localIvy.addIvyPattern(localIvyRoot.getAbsolutePath + File.separator + ivyPattern)
localIvy.setName("local-ivy-cache")
cr.add(localIvy)

Expand Down Expand Up @@ -832,11 +840,7 @@ private[spark] object SparkSubmitUtils {
ivyConfName: String,
md: DefaultModuleDescriptor): Unit = {
// Add scala exclusion rule
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
val scalaDependencyExcludeRule =
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
scalaDependencyExcludeRule.addConfiguration(ivyConfName)
md.addExcludeRule(scalaDependencyExcludeRule)
md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName))

// We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and
// other spark-streaming utility components. Underscore is there to differentiate between
Expand All @@ -845,13 +849,8 @@ private[spark] object SparkSubmitUtils {
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")

components.foreach { comp =>
val sparkArtifacts =
new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", "*", "*")
val sparkDependencyExcludeRule =
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
sparkDependencyExcludeRule.addConfiguration(ivyConfName)

md.addExcludeRule(sparkDependencyExcludeRule)
md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings,
ivyConfName))
}
}

Expand All @@ -864,13 +863,15 @@ private[spark] object SparkSubmitUtils {
* @param coordinates Comma-delimited string of maven coordinates
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
* @param exclusions Exclusions to apply when resolving transitive dependencies
* @return The comma-delimited path to the jars of the given maven artifacts including their
* transitive dependencies
*/
def resolveMavenCoordinates(
coordinates: String,
remoteRepos: Option[String],
ivyPath: Option[String],
exclusions: Seq[String] = Nil,
isTest: Boolean = false): String = {
if (coordinates == null || coordinates.trim.isEmpty) {
""
Expand Down Expand Up @@ -928,6 +929,10 @@ private[spark] object SparkSubmitUtils {
// add all supplied maven artifacts as dependencies
addDependenciesToIvy(md, artifacts, ivyConfName)

exclusions.foreach { e =>
md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName))
}

// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
if (rr.hasError) {
Expand All @@ -944,6 +949,18 @@ private[spark] object SparkSubmitUtils {
}
}
}

private def createExclusion(
coords: String,
ivySettings: IvySettings,
ivyConfName: String): ExcludeRule = {
val c = extractMavenCoordinates(coords)(0)
val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*")
val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null)
rule.addConfiguration(ivyConfName)
rule
}

}

/**
Expand Down
Loading

0 comments on commit ae5979c

Please sign in to comment.