Skip to content

merge upstream changes #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Sep 19, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
b3830b2
Docs: move HA subsections to a deeper indentation level
ash211 Sep 17, 2014
7fc3bb7
[SPARK-3534] Fix expansion of testing arguments to sbt
nchammas Sep 17, 2014
cbc0650
[SPARK-3571] Spark standalone cluster mode doesn't work.
sarutak Sep 17, 2014
6688a26
[SPARK-3564][WebUI] Display App ID on HistoryPage
sarutak Sep 17, 2014
1147973
[SPARK-3567] appId field in SparkDeploySchedulerBackend should be vol…
sarutak Sep 17, 2014
3f169bf
[SPARK-3565]Fix configuration item not consistent with document
WangTaoTheTonic Sep 18, 2014
5547fa1
[SPARK-3534] Add hive-thriftserver to SQL tests
nchammas Sep 18, 2014
6772afe
[Minor] rat exclude dependency-reduced-pom.xml
witgo Sep 18, 2014
3447d10
[SPARK-3547]Using a special exit code instead of 1 to represent Class…
WangTaoTheTonic Sep 18, 2014
3ad4176
SPARK-3579 Jekyll doc generation is different across environments.
pwendell Sep 18, 2014
6cab838
[SPARK-3566] [BUILD] .gitignore and .rat-excludes should consider Win…
sarutak Sep 18, 2014
471e6a3
[SPARK-3589][Minor]remove redundant code
WangTaoTheTonic Sep 18, 2014
b3ed37e
[SPARK-3560] Fixed setting spark.jars system property in yarn-cluster…
Victsm Sep 18, 2014
9306297
[Minor Hot Fix] Move a line in SparkSubmit to the right place
andrewor14 Sep 19, 2014
e77fa81
[SPARK-3554] [PySpark] use broadcast automatically for large closure
davies Sep 19, 2014
e76ef5c
[SPARK-3418] Sparse Matrix support (CCS) and additional native BLAS o…
brkyvz Sep 19, 2014
3bbbdd8
[SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc
larryxiao Sep 19, 2014
a48956f
MAINTENANCE: Automated closing of pull requests.
pwendell Sep 19, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
*~
*.#*
*#*#
*.swp
*.ipr
*.iml
Expand All @@ -16,6 +18,7 @@ third_party/libmesos.so
third_party/libmesos.dylib
conf/java-opts
conf/*.sh
conf/*.cmd
conf/*.properties
conf/*.conf
conf/*.xml
Expand Down
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ log4j.properties.template
metrics.properties.template
slaves
spark-env.sh
spark-env.cmd
spark-env.sh.template
log4j-defaults.properties
bootstrap-tooltip.js
Expand Down Expand Up @@ -58,3 +59,4 @@ dist/*
.*iws
logs
.*scalastyle-output.xml
.*dependency-reduced-pom.xml
2 changes: 1 addition & 1 deletion bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
set -o posix

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
CLASS_NOT_FOUND_EXIT_STATUS=1
CLASS_NOT_FOUND_EXIT_STATUS=101

# Figure out where Spark is installed
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object SparkSubmit {
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"

private val CLASS_NOT_FOUND_EXIT_STATUS = 1
private val CLASS_NOT_FOUND_EXIT_STATUS = 101

// Exposed for testing
private[spark] var exitFn: () => Unit = () => System.exit(-1)
Expand Down Expand Up @@ -172,7 +172,7 @@ object SparkSubmit {
// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
Expand All @@ -183,6 +183,7 @@ object SparkSubmit {
sysProp = "spark.driver.extraLibraryPath"),

// Standalone cluster only
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),

Expand Down Expand Up @@ -261,7 +262,7 @@ object SparkSubmit {
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (clusterManager == YARN && deployMode == CLUSTER) {
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
}

private val appHeader = Seq(
"App ID",
"App Name",
"Started",
"Completed",
Expand All @@ -81,7 +82,8 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
val duration = UIUtils.formatDuration(info.endTime - info.startTime)
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{info.name}</a></td>
<td><a href={uiAddress}>{info.id}</a></td>
<td>{info.name}</td>
<td>{startTime}</td>
<td>{endTime}</td>
<td>{duration}</td>
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -489,23 +489,24 @@ private[spark] class Master(
// First schedule drivers, they take strict precedence over applications
// Randomization helps balance drivers
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val aliveWorkerNum = shuffledAliveWorkers.size
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0

for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
curPos = (curPos + 1) % aliveWorkerNum
val startPos = curPos
var launched = false
while (curPos != startPos && !launched) {
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % aliveWorkerNum
curPos = (curPos + 1) % numWorkersAlive
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ private[spark] class ReceivingConnection(
if (currId != null) currId else super.getRemoteConnectionManagerId()
}

// The reciever's remote address is the local socket on remote side : which is NOT
// The receiver's remote address is the local socket on remote side : which is NOT
// the connection manager id of the receiver.
// We infer that from the messages we receive on the receiver socket.
private def processConnectionManagerId(header: MessageChunkHeader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ private[nio] class ConnectionManager(

def changeConnectionKeyInterest(connection: Connection, ops: Int) {
keyInterestChangeRequests += ((connection.key, ops))
// so that registerations happen !
// so that registrations happen !
wakeupSelector()
}

Expand Down Expand Up @@ -832,7 +832,7 @@ private[nio] class ConnectionManager(
}

/**
* Send a message and block until an acknowldgment is received or an error occurs.
* Send a message and block until an acknowledgment is received or an error occurs.
* @param connectionManagerId the message's destination
* @param message the message being sent
* @return a Future that either returns the acknowledgment message or captures an exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[spark] class SparkDeploySchedulerBackend(
var client: AppClient = null
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
var appId: String = _
@volatile var appId: String = _

val registrationLock = new Object()
var registrationDone = false
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1382,15 +1382,15 @@ private[spark] object Utils extends Logging {
}

/**
* Default number of retries in binding to a port.
* Default maximum number of retries when binding to a port before giving up.
*/
val portMaxRetries: Int = {
if (sys.props.contains("spark.testing")) {
// Set a higher number of retries for tests...
sys.props.get("spark.ports.maxRetries").map(_.toInt).getOrElse(100)
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100)
} else {
Option(SparkEnv.get)
.flatMap(_.conf.getOption("spark.ports.maxRetries"))
.flatMap(_.conf.getOption("spark.port.maxRetries"))
.map(_.toInt)
.getOrElse(16)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,13 @@ class JsonProtocolSuite extends FunSuite {
workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
workerInfo
}

def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, ExecutorState.RUNNING)
}

def createDriverRunner(): DriverRunner = {
new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"),
createDriverDesc(), null, "akka://worker")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
sysProps("spark.app.name") should be ("beauty")
sysProps("spark.shuffle.spill") should be ("false")
sysProps("SPARK_SUBMIT") should be ("true")
sysProps.keys should not contain ("spark.jars")
}

test("handles YARN client mode") {
Expand Down
11 changes: 7 additions & 4 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -141,25 +141,28 @@ echo "========================================================================="

{
# If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled.
# This must be a single argument, as it is.
if [ -n "$_RUN_SQL_TESTS" ]; then
SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive"
fi

if [ -n "$_SQL_TESTS_ONLY" ]; then
SBT_MAVEN_TEST_ARGS="catalyst/test sql/test hive/test"
# This must be an array of individual arguments. Otherwise, having one long string
#+ will be interpreted as a single test, which doesn't work.
SBT_MAVEN_TEST_ARGS=("catalyst/test" "sql/test" "hive/test" "hive-thriftserver/test")
else
SBT_MAVEN_TEST_ARGS="test"
SBT_MAVEN_TEST_ARGS=("test")
fi

echo "[info] Running Spark tests with these arguments: $SBT_MAVEN_PROFILES_ARGS $SBT_MAVEN_TEST_ARGS"
echo "[info] Running Spark tests with these arguments: $SBT_MAVEN_PROFILES_ARGS ${SBT_MAVEN_TEST_ARGS[@]}"

# NOTE: echo "q" is needed because sbt on encountering a build file with failure
#+ (either resolution or compilation) prompts the user for input either q, r, etc
#+ to quit or retry. This echo is there to make it not block.
# QUESTION: Why doesn't 'yes "q"' work?
# QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work?
echo -e "q\n" \
| sbt/sbt "$SBT_MAVEN_PROFILES_ARGS" "$SBT_MAVEN_TEST_ARGS" \
| sbt/sbt "$SBT_MAVEN_PROFILES_ARGS" "${SBT_MAVEN_TEST_ARGS[@]}" \
| grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
}

Expand Down
16 changes: 10 additions & 6 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ In this directory you will find textfiles formatted using Markdown, with an ".md
read those text files directly if you want. Start with index.md.

The markdown code can be compiled to HTML using the [Jekyll tool](http://jekyllrb.com).
To use the `jekyll` command, you will need to have Jekyll installed.
The easiest way to do this is via a Ruby Gem, see the
[jekyll installation instructions](http://jekyllrb.com/docs/installation).
If not already installed, you need to install `kramdown` and `jekyll-redirect-from` Gems
with `sudo gem install kramdown jekyll-redirect-from`.
Execute `jekyll build` from the `docs/` directory. Compiling the site with Jekyll will create a directory
`Jekyll` and a few dependencies must be installed for this to work. We recommend
installing via the Ruby Gem dependency manager. Since the exact HTML output
varies between versions of Jekyll and its dependencies, we list specific versions here
in some cases:

$ sudo gem install jekyll -v 1.4.3
$ sudo gem uninstall kramdown -v 1.4.1
$ sudo gem install jekyll-redirect-from

Execute `jekyll` from the `docs/` directory. Compiling the site with Jekyll will create a directory
called `_site` containing index.html as well as the rest of the compiled files.

You can modify the default Jekyll build as follows:
Expand Down
5 changes: 5 additions & 0 deletions docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ markdown: kramdown
gems:
- jekyll-redirect-from

# For some reason kramdown seems to behave differently on different
# OS/packages wrt encoding. So we hard code this config.
kramdown:
entity_output: numeric

# These allow the documentation to be updated with nerw releases
# of Spark, Scala, and Mesos.
SPARK_VERSION: 1.0.0-SNAPSHOT
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.port.maxRetries</code></td>
<td>16</td>
<td>
Maximum number of retries when binding to a port before giving up.
Default maximum number of retries when binding to a port before giving up.
</td>
</tr>
<tr>
Expand Down
4 changes: 2 additions & 2 deletions docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ tight firewall settings. For a complete list of ports to configure, see the

By default, standalone scheduling clusters are resilient to Worker failures (insofar as Spark itself is resilient to losing work by moving it to other workers). However, the scheduler uses a Master to make scheduling decisions, and this (by default) creates a single point of failure: if the Master crashes, no new applications can be created. In order to circumvent this, we have two high availability schemes, detailed below.

# Standby Masters with ZooKeeper
## Standby Masters with ZooKeeper

**Overview**

Expand Down Expand Up @@ -347,7 +347,7 @@ There's an important distinction to be made between "registering with a Master"

Due to this property, new Masters can be created at any time, and the only thing you need to worry about is that _new_ applications and Workers can find it to register with in case it becomes the leader. Once registered, you're taken care of.

# Single-Node Recovery with Local File System
## Single-Node Recovery with Local File System

**Overview**

Expand Down
4 changes: 2 additions & 2 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ object VertexRDD {
*/
def apply[VD: ClassTag](
vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = {
VertexRDD(vertices, edges, defaultVal, (a, b) => b)
VertexRDD(vertices, edges, defaultVal, (a, b) => a)
}

/**
Expand All @@ -419,7 +419,7 @@ object VertexRDD {
(vertexIter, routingTableIter) =>
val routingTable =
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal))
Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal, mergeFunc))
}
new VertexRDD(vertexPartitions)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,36 @@ private[graphx]
object ShippableVertexPartition {
/** Construct a `ShippableVertexPartition` from the given vertices without any routing table. */
def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): ShippableVertexPartition[VD] =
apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD])
apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD], (a, b) => a)

/**
* Construct a `ShippableVertexPartition` from the given vertices with the specified routing
* table, filling in missing vertices mentioned in the routing table using `defaultVal`.
*/
def apply[VD: ClassTag](
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD)
: ShippableVertexPartition[VD] = {
val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal))
val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, (a: VD, b: VD) => a)
new ShippableVertexPartition(index, values, mask, routingTable)
: ShippableVertexPartition[VD] =
apply(iter, routingTable, defaultVal, (a, b) => a)

/**
* Construct a `ShippableVertexPartition` from the given vertices with the specified routing
* table, filling in missing vertices mentioned in the routing table using `defaultVal`,
* and merging duplicate vertex atrribute with mergeFunc.
*/
def apply[VD: ClassTag](
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD,
mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] = {
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
// Merge the given vertices using mergeFunc
iter.foreach { pair =>
map.setMerge(pair._1, pair._2, mergeFunc)
}
// Fill in missing vertices mentioned in the routing table
routingTable.iterator.foreach { vid =>
map.changeValue(vid, defaultVal, identity)
}

new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
}

import scala.language.implicitConversions
Expand Down
11 changes: 11 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,15 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
}
}

test("mergeFunc") {
// test to see if the mergeFunc is working correctly
withSpark { sc =>
val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
val rdd = VertexRDD(verts, edges, 0, (a: Int, b: Int) => a + b)
// test merge function
assert(rdd.collect.toSet == Set((0L, 0), (1L, 3), (2L, 9)))
}
}

}
Loading