Skip to content

Commit 9a66cb0

Browse files
committed
resolving merge conflicts
*fingers crossed* I admit I’m not exactly sure how this works… Let’s see if I did the right thing.
1 parent a31ccc4 commit 9a66cb0

File tree

132 files changed

+4986
-263
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

132 files changed

+4986
-263
lines changed

LICENSE

+1
Original file line numberDiff line numberDiff line change
@@ -549,3 +549,4 @@ The following components are provided under the MIT License. See project link fo
549549
(MIT License) pyrolite (org.spark-project:pyrolite:2.0.1 - http://pythonhosted.org/Pyro4/)
550550
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
551551
(The MIT License) Mockito (org.mockito:mockito-all:1.8.5 - http://www.mockito.org)
552+
(MIT License) jquery (https://jquery.org/license/)

bin/run-example

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ if [ -n "$1" ]; then
2929
else
3030
echo "Usage: ./bin/run-example <example-class> [example-args]" 1>&2
3131
echo " - set MASTER=XX to use a specific master" 1>&2
32-
echo " - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)" 1>&2
32+
echo " - can use abbreviated example class name relative to com.apache.spark.examples" 1>&2
33+
echo " (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL)" 1>&2
3334
exit 1
3435
fi
3536

bin/run-example2.cmd

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ rem Test that an argument was given
3232
if not "x%1"=="x" goto arg_given
3333
echo Usage: run-example ^<example-class^> [example-args]
3434
echo - set MASTER=XX to use a specific master
35-
echo - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)
35+
echo - can use abbreviated example class name relative to com.apache.spark.examples
36+
echo (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL)
3637
goto exit
3738
:arg_given
3839

core/src/main/scala/org/apache/spark/Logging.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,17 @@ trait Logging {
3939
// be serialized and used on another machine
4040
@transient private var log_ : Logger = null
4141

42+
// Method to get the logger name for this object
43+
protected def logName = {
44+
// Ignore trailing $'s in the class names for Scala objects
45+
this.getClass.getName.stripSuffix("$")
46+
}
47+
4248
// Method to get or create the logger for this object
4349
protected def log: Logger = {
4450
if (log_ == null) {
4551
initializeIfNecessary()
46-
var className = this.getClass.getName
47-
// Ignore trailing $'s in the class names for Scala objects
48-
log_ = LoggerFactory.getLogger(className.stripSuffix("$"))
52+
log_ = LoggerFactory.getLogger(logName)
4953
}
5054
log_
5155
}

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

+7-2
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private[spark] class Worker(
7171
// TTL for app folders/data; after TTL expires it will be cleaned up
7272
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
7373

74-
74+
val testing: Boolean = sys.props.contains("spark.testing")
7575
val masterLock: Object = new Object()
7676
var master: ActorSelection = null
7777
var masterAddress: Address = null
@@ -82,7 +82,12 @@ private[spark] class Worker(
8282
@volatile var connected = false
8383
val workerId = generateWorkerId()
8484
val sparkHome =
85-
new File(sys.props.get("spark.test.home").orElse(sys.env.get("SPARK_HOME")).getOrElse("."))
85+
if (testing) {
86+
assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
87+
new File(sys.props("spark.test.home"))
88+
} else {
89+
new File(sys.env.get("SPARK_HOME").getOrElse("."))
90+
}
8691
var workDir: File = null
8792
val executors = new HashMap[String, ExecutorRunner]
8893
val finishedExecutors = new HashMap[String, ExecutorRunner]

core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
4646
metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
4747
override def getValue: Long = {
4848
val storageStatusList = blockManager.master.getStorageStatus
49-
val maxMem = storageStatusList.map(_.maxMem).sum
50-
val remainingMem = storageStatusList.map(_.memRemaining).sum
51-
(maxMem - remainingMem) / 1024 / 1024
49+
val memUsed = storageStatusList.map(_.memUsed).sum
50+
memUsed / 1024 / 1024
5251
}
5352
})
5453

core/src/main/scala/org/apache/spark/storage/StorageUtils.scala

+4-7
Original file line numberDiff line numberDiff line change
@@ -172,16 +172,13 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
172172
def memRemaining: Long = maxMem - memUsed
173173

174174
/** Return the memory used by this block manager. */
175-
def memUsed: Long =
176-
_nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
175+
def memUsed: Long = _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
177176

178177
/** Return the disk space used by this block manager. */
179-
def diskUsed: Long =
180-
_nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
178+
def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
181179

182180
/** Return the off-heap space used by this block manager. */
183-
def offHeapUsed: Long =
184-
_nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum
181+
def offHeapUsed: Long = _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum
185182

186183
/** Return the memory used by the given RDD in this block manager in O(1) time. */
187184
def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
@@ -246,7 +243,7 @@ private[spark] object StorageUtils {
246243
val rddId = rddInfo.id
247244
// Assume all blocks belonging to the same RDD have the same storage level
248245
val storageLevel = statuses
249-
.map(_.rddStorageLevel(rddId)).flatMap(s => s).headOption.getOrElse(StorageLevel.NONE)
246+
.flatMap(_.rddStorageLevel(rddId)).headOption.getOrElse(StorageLevel.NONE)
250247
val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
251248
val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
252249
val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum

core/src/test/scala/org/apache/spark/DriverSuite.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import scala.language.postfixOps
3434
class DriverSuite extends FunSuite with Timeouts {
3535

3636
test("driver should exit after finishing") {
37-
val sparkHome = sys.props("spark.test.home")
37+
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
3838
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
3939
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
4040
forAll(masters) { (master: String) =>

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
295295

296296
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
297297
def runSparkSubmit(args: Seq[String]): String = {
298-
val sparkHome = sys.props("spark.test.home")
298+
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
299299
Utils.executeAndGetOutput(
300300
Seq("./bin/spark-submit") ++ args,
301301
new File(sparkHome),

core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.SparkConf
2727
class ExecutorRunnerTest extends FunSuite {
2828
test("command includes appId") {
2929
def f(s:String) = new File(s)
30-
val sparkHome = sys.props("spark.test.home")
30+
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
3131
val appDesc = new ApplicationDescription("app name", Some(8), 500,
3232
Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl")
3333
val appId = "12345-worker321-9876"

dev/audit-release/audit_release.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def get_url(url):
105105
"spark-core", "spark-bagel", "spark-mllib", "spark-streaming", "spark-repl",
106106
"spark-graphx", "spark-streaming-flume", "spark-streaming-kafka",
107107
"spark-streaming-mqtt", "spark-streaming-twitter", "spark-streaming-zeromq",
108-
"spark-catalyst", "spark-sql", "spark-hive"
108+
"spark-catalyst", "spark-sql", "spark-hive", "spark-streaming-kinesis-asl"
109109
]
110110
modules = map(lambda m: "%s_%s" % (m, SCALA_BINARY_VERSION), modules)
111111

@@ -136,7 +136,7 @@ def ensure_path_not_present(x):
136136
os.chdir(original_dir)
137137

138138
# SBT application tests
139-
for app in ["sbt_app_core", "sbt_app_graphx", "sbt_app_streaming", "sbt_app_sql", "sbt_app_hive"]:
139+
for app in ["sbt_app_core", "sbt_app_graphx", "sbt_app_streaming", "sbt_app_sql", "sbt_app_hive", "sbt_app_kinesis"]:
140140
os.chdir(app)
141141
ret = run_cmd("sbt clean run", exit_on_failure=False)
142142
test(ret == 0, "sbt application (%s)" % app)

dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala

+7
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,12 @@ object SimpleApp {
5050
println("Ganglia sink was loaded via spark-core")
5151
System.exit(-1)
5252
}
53+
54+
// Remove kinesis from default build due to ASL license issue
55+
val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess
56+
if (foundKinesis) {
57+
println("Kinesis was loaded via spark-core")
58+
System.exit(-1)
59+
}
5360
}
5461
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one or more
3+
// contributor license agreements. See the NOTICE file distributed with
4+
// this work for additional information regarding copyright ownership.
5+
// The ASF licenses this file to You under the Apache License, Version 2.0
6+
// (the "License"); you may not use this file except in compliance with
7+
// the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
//
17+
18+
name := "Kinesis Test"
19+
20+
version := "1.0"
21+
22+
scalaVersion := System.getenv.get("SCALA_VERSION")
23+
24+
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % System.getenv.get("SPARK_VERSION")
25+
26+
resolvers ++= Seq(
27+
"Spark Release Repository" at System.getenv.get("SPARK_RELEASE_REPOSITORY"),
28+
"Spray Repository" at "http://repo.spray.cc/")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package main.scala
19+
20+
import scala.util.Try
21+
22+
import org.apache.spark.SparkContext
23+
import org.apache.spark.SparkContext._
24+
25+
object SimpleApp {
26+
def main(args: Array[String]) {
27+
val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess
28+
if (!foundKinesis) {
29+
println("Kinesis not loaded via kinesis-asl")
30+
System.exit(-1)
31+
}
32+
}
33+
}

dev/audit-release/sbt_app_sql/src/main/scala/SqlApp.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ object SparkSqlExample {
3838

3939
import sqlContext._
4040
val people = sc.makeRDD(1 to 100, 10).map(x => Person(s"Name$x", x))
41-
people.registerAsTable("people")
41+
people.registerTempTable("people")
4242
val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
4343
val teenagerNames = teenagers.map(t => "Name: " + t(0)).collect()
4444
teenagerNames.foreach(println)

dev/create-release/create-release.sh

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,15 @@ if [[ ! "$@" =~ --package-only ]]; then
5353
-Dusername=$GIT_USERNAME -Dpassword=$GIT_PASSWORD \
5454
-Dmaven.javadoc.skip=true \
5555
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
56-
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
5756
-Dtag=$GIT_TAG -DautoVersionSubmodules=true \
57+
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
5858
--batch-mode release:prepare
5959

6060
mvn -DskipTests \
6161
-Darguments="-DskipTests=true -Dmaven.javadoc.skip=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \
6262
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
6363
-Dmaven.javadoc.skip=true \
64-
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
64+
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
6565
release:perform
6666

6767
cd ..

dev/run-tests

+3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ fi
3636
if [ -z "$SBT_MAVEN_PROFILES_ARGS" ]; then
3737
export SBT_MAVEN_PROFILES_ARGS="-Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0"
3838
fi
39+
40+
export SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Pkinesis-asl"
41+
3942
echo "SBT_MAVEN_PROFILES_ARGS=\"$SBT_MAVEN_PROFILES_ARGS\""
4043

4144
# Remove work directory

docs/sql-programming-guide.md

+9-9
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ case class Person(name: String, age: Int)
142142

143143
// Create an RDD of Person objects and register it as a table.
144144
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
145-
people.registerAsTable("people")
145+
people.registerTempTable("people")
146146

147147
// SQL statements can be run by using the sql methods provided by sqlContext.
148148
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
@@ -210,7 +210,7 @@ JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").m
210210

211211
// Apply a schema to an RDD of JavaBeans and register it as a table.
212212
JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class);
213-
schemaPeople.registerAsTable("people");
213+
schemaPeople.registerTempTable("people");
214214

215215
// SQL can be run over RDDs that have been registered as tables.
216216
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
@@ -248,7 +248,7 @@ people = parts.map(lambda p: {"name": p[0], "age": int(p[1])})
248248
# In future versions of PySpark we would like to add support for registering RDDs with other
249249
# datatypes as tables
250250
schemaPeople = sqlContext.inferSchema(people)
251-
schemaPeople.registerAsTable("people")
251+
schemaPeople.registerTempTable("people")
252252

253253
# SQL can be run over SchemaRDDs that have been registered as a table.
254254
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
@@ -292,7 +292,7 @@ people.saveAsParquetFile("people.parquet")
292292
val parquetFile = sqlContext.parquetFile("people.parquet")
293293

294294
//Parquet files can also be registered as tables and then used in SQL statements.
295-
parquetFile.registerAsTable("parquetFile")
295+
parquetFile.registerTempTable("parquetFile")
296296
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
297297
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
298298
{% endhighlight %}
@@ -314,7 +314,7 @@ schemaPeople.saveAsParquetFile("people.parquet");
314314
JavaSchemaRDD parquetFile = sqlContext.parquetFile("people.parquet");
315315

316316
//Parquet files can also be registered as tables and then used in SQL statements.
317-
parquetFile.registerAsTable("parquetFile");
317+
parquetFile.registerTempTable("parquetFile");
318318
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
319319
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
320320
public String call(Row row) {
@@ -340,7 +340,7 @@ schemaPeople.saveAsParquetFile("people.parquet")
340340
parquetFile = sqlContext.parquetFile("people.parquet")
341341

342342
# Parquet files can also be registered as tables and then used in SQL statements.
343-
parquetFile.registerAsTable("parquetFile");
343+
parquetFile.registerTempTable("parquetFile");
344344
teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
345345
teenNames = teenagers.map(lambda p: "Name: " + p.name)
346346
for teenName in teenNames.collect():
@@ -378,7 +378,7 @@ people.printSchema()
378378
// |-- name: StringType
379379

380380
// Register this SchemaRDD as a table.
381-
people.registerAsTable("people")
381+
people.registerTempTable("people")
382382

383383
// SQL statements can be run by using the sql methods provided by sqlContext.
384384
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
@@ -416,7 +416,7 @@ people.printSchema();
416416
// |-- name: StringType
417417

418418
// Register this JavaSchemaRDD as a table.
419-
people.registerAsTable("people");
419+
people.registerTempTable("people");
420420

421421
// SQL statements can be run by using the sql methods provided by sqlContext.
422422
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
@@ -455,7 +455,7 @@ people.printSchema()
455455
# |-- name: StringType
456456

457457
# Register this SchemaRDD as a table.
458-
people.registerAsTable("people")
458+
people.registerTempTable("people")
459459

460460
# SQL statements can be run by using the sql methods provided by sqlContext.
461461
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

docs/streaming-custom-receivers.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ title: Spark Streaming Custom Receivers
44
---
55

66
Spark Streaming can receive streaming data from any arbitrary data source beyond
7-
the one's for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.).
7+
the one's for which it has in-built support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.).
88
This requires the developer to implement a *receiver* that is customized for receiving data from
99
the concerned data source. This guide walks through the process of implementing a custom receiver
1010
and using it in a Spark Streaming application.
@@ -174,7 +174,7 @@ val words = lines.flatMap(_.split(" "))
174174
...
175175
{% endhighlight %}
176176

177-
The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala).
177+
The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala).
178178

179179
</div>
180180
<div data-lang="java" markdown="1">

0 commit comments

Comments
 (0)