Skip to content

Commit d4c5f43

Browse files
committed
Merge pull request apache#6 from apache/master
merge upstream changes
2 parents dc1ba9e + ba5bcad commit d4c5f43

File tree

23 files changed

+236
-74
lines changed

23 files changed

+236
-74
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ Spark is a fast and general cluster computing system for Big Data. It provides
44
high-level APIs in Scala, Java, and Python, and an optimized engine that
55
supports general computation graphs for data analysis. It also supports a
66
rich set of higher-level tools including Spark SQL for SQL and structured
7-
data processing, MLLib for machine learning, GraphX for graph processing,
8-
and Spark Streaming.
7+
data processing, MLlib for machine learning, GraphX for graph processing,
8+
and Spark Streaming for stream processing.
99

1010
<http://spark.apache.org/>
1111

bin/pyspark

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ export PYSPARK_SUBMIT_ARGS
8585

8686
# For pyspark tests
8787
if [[ -n "$SPARK_TESTING" ]]; then
88+
unset YARN_CONF_DIR
89+
unset HADOOP_CONF_DIR
8890
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
8991
exec "$PYSPARK_PYTHON" -m doctest $1
9092
else

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,11 +1064,10 @@ abstract class RDD[T: ClassTag](
10641064
// greater than totalParts because we actually cap it at totalParts in runJob.
10651065
var numPartsToTry = 1
10661066
if (partsScanned > 0) {
1067-
// If we didn't find any rows after the first iteration, just try all partitions next.
1068-
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
1069-
// by 50%.
1067+
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
1068+
// interpolate the number of partitions we need to try, but overestimate it by 50%.
10701069
if (buf.size == 0) {
1071-
numPartsToTry = totalParts - 1
1070+
numPartsToTry = partsScanned * 4
10721071
} else {
10731072
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
10741073
}

ec2/spark_ec2.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,17 @@ def parse_args():
102102
"(for debugging)")
103103
parser.add_option(
104104
"--ebs-vol-size", metavar="SIZE", type="int", default=0,
105-
help="Attach a new EBS volume of size SIZE (in GB) to each node as " +
106-
"/vol. The volumes will be deleted when the instances terminate. " +
107-
"Only possible on EBS-backed AMIs.")
105+
help="Size (in GB) of each EBS volume.")
106+
parser.add_option(
107+
"--ebs-vol-type", default="standard",
108+
help="EBS volume type (e.g. 'gp2', 'standard').")
109+
parser.add_option(
110+
"--ebs-vol-num", type="int", default=1,
111+
help="Number of EBS volumes to attach to each node as /vol[x]. " +
112+
"The volumes will be deleted when the instances terminate. " +
113+
"Only possible on EBS-backed AMIs. " +
114+
"EBS volumes are only attached if --ebs-vol-size > 0." +
115+
"Only support up to 8 EBS volumes.")
108116
parser.add_option(
109117
"--swap", metavar="SWAP", type="int", default=1024,
110118
help="Swap space to set up per node, in MB (default: 1024)")
@@ -348,13 +356,16 @@ def launch_cluster(conn, opts, cluster_name):
348356
print >> stderr, "Could not find AMI " + opts.ami
349357
sys.exit(1)
350358

351-
# Create block device mapping so that we can add an EBS volume if asked to
359+
# Create block device mapping so that we can add EBS volumes if asked to.
360+
# The first drive is attached as /dev/sds, 2nd as /dev/sdt, ... /dev/sdz
352361
block_map = BlockDeviceMapping()
353362
if opts.ebs_vol_size > 0:
354-
device = EBSBlockDeviceType()
355-
device.size = opts.ebs_vol_size
356-
device.delete_on_termination = True
357-
block_map["/dev/sdv"] = device
363+
for i in range(opts.ebs_vol_num):
364+
device = EBSBlockDeviceType()
365+
device.size = opts.ebs_vol_size
366+
device.volume_type=opts.ebs_vol_type
367+
device.delete_on_termination = True
368+
block_map["/dev/sd" + chr(ord('s') + i)] = device
358369

359370
# AWS ignores the AMI-specified block device mapping for M3 (see SPARK-3342).
360371
if opts.instance_type.startswith('m3.'):
@@ -828,6 +839,12 @@ def get_partition(total, num_partitions, current_partitions):
828839

829840
def real_main():
830841
(opts, action, cluster_name) = parse_args()
842+
843+
# Input parameter validation
844+
if opts.ebs_vol_num > 8:
845+
print >> stderr, "ebs-vol-num cannot be greater than 8"
846+
sys.exit(1)
847+
831848
try:
832849
conn = ec2.connect_to_region(opts.region)
833850
except Exception as e:

python/pyspark/rdd.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,11 +1089,11 @@ def take(self, num):
10891089
# we actually cap it at totalParts in runJob.
10901090
numPartsToTry = 1
10911091
if partsScanned > 0:
1092-
# If we didn't find any rows after the first iteration, just
1093-
# try all partitions next. Otherwise, interpolate the number
1094-
# of partitions we need to try, but overestimate it by 50%.
1092+
# If we didn't find any rows after the previous iteration,
1093+
# quadruple and retry. Otherwise, interpolate the number of
1094+
# partitions we need to try, but overestimate it by 50%.
10951095
if len(items) == 0:
1096-
numPartsToTry = totalParts - 1
1096+
numPartsToTry = partsScanned * 4
10971097
else:
10981098
numPartsToTry = int(1.5 * num * partsScanned / len(items))
10991099

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
272272
val currentTable = table(tableName).queryExecution.analyzed
273273
val asInMemoryRelation = currentTable match {
274274
case _: InMemoryRelation =>
275-
currentTable.logicalPlan
275+
currentTable
276276

277277
case _ =>
278278
InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,10 @@ case class SetCommand(
6060
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
6161
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
6262
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
63-
Array(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v"))
63+
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v"))
6464
} else {
6565
context.setConf(k, v)
66-
Array(Row(s"$k=$v"))
66+
Seq(Row(s"$k=$v"))
6767
}
6868

6969
// Query the value bound to key k.
@@ -78,11 +78,19 @@ case class SetCommand(
7878
"hive-hwi-0.12.0.jar",
7979
"hive-0.12.0.jar").mkString(":")
8080

81-
Array(
81+
context.getAllConfs.map { case (k, v) =>
82+
Row(s"$k=$v")
83+
}.toSeq ++ Seq(
8284
Row("system:java.class.path=" + hiveJars),
8385
Row("system:sun.java.command=shark.SharkServer2"))
8486
} else {
85-
Array(Row(s"$k=${context.getConf(k, "<undefined>")}"))
87+
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
88+
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
89+
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
90+
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}"))
91+
} else {
92+
Seq(Row(s"$k=${context.getConf(k, "<undefined>")}"))
93+
}
8694
}
8795

8896
// Query all key-value pairs that are set in the SQLConf of the context.

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ private[hive] case class SourceCommand(filePath: String) extends Command
4444

4545
private[hive] case class AddFile(filePath: String) extends Command
4646

47+
private[hive] case class AddJar(path: String) extends Command
48+
4749
private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command
4850

4951
private[hive] case class AnalyzeTable(tableName: String) extends Command
@@ -231,7 +233,7 @@ private[hive] object HiveQl {
231233
} else if (sql.trim.toLowerCase.startsWith("uncache table")) {
232234
CacheCommand(sql.trim.drop(14).trim, false)
233235
} else if (sql.trim.toLowerCase.startsWith("add jar")) {
234-
NativeCommand(sql)
236+
AddJar(sql.trim.drop(8).trim)
235237
} else if (sql.trim.toLowerCase.startsWith("add file")) {
236238
AddFile(sql.trim.drop(9))
237239
} else if (sql.trim.toLowerCase.startsWith("dfs")) {
@@ -1018,9 +1020,9 @@ private[hive] object HiveQl {
10181020

10191021
/* Other functions */
10201022
case Token("TOK_FUNCTION", Token(RAND(), Nil) :: Nil) => Rand
1021-
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) =>
1023+
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) =>
10221024
Substring(nodeToExpr(string), nodeToExpr(pos), Literal(Integer.MAX_VALUE, IntegerType))
1023-
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) =>
1025+
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) =>
10241026
Substring(nodeToExpr(string), nodeToExpr(pos), nodeToExpr(length))
10251027

10261028
/* UDFs - Must be last otherwise will preempt built in functions */

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,11 +195,12 @@ private[hive] trait HiveStrategies {
195195

196196
case class HiveCommandStrategy(context: HiveContext) extends Strategy {
197197
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
198-
case logical.NativeCommand(sql) =>
199-
NativeCommand(sql, plan.output)(context) :: Nil
198+
case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil
200199

201200
case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
202201

202+
case hive.AddJar(path) => execution.AddJar(path) :: Nil
203+
203204
case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
204205

205206
case describe: logical.DescribeCommand =>

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,19 @@ case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with
6060
Seq.empty[Row]
6161
}
6262
}
63+
64+
/**
65+
* :: DeveloperApi ::
66+
*/
67+
@DeveloperApi
68+
case class AddJar(path: String) extends LeafNode with Command {
69+
def hiveContext = sqlContext.asInstanceOf[HiveContext]
70+
71+
override def output = Seq.empty
72+
73+
override protected[sql] lazy val sideEffectResult: Seq[Row] = {
74+
hiveContext.runSqlHive(s"ADD JAR $path")
75+
hiveContext.sparkContext.addJar(path)
76+
Seq.empty[Row]
77+
}
78+
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.spark.sql.hive.execution
1919

20+
import java.io.File
21+
2022
import scala.util.Try
2123

22-
import org.apache.spark.sql.{SchemaRDD, Row}
24+
import org.apache.spark.SparkException
2325
import org.apache.spark.sql.hive._
2426
import org.apache.spark.sql.hive.test.TestHive
2527
import org.apache.spark.sql.hive.test.TestHive._
@@ -313,7 +315,7 @@ class HiveQuerySuite extends HiveComparisonTest {
313315
"SELECT srcalias.KEY, SRCALIAS.value FROM sRc SrCAlias WHERE SrCAlias.kEy < 15")
314316

315317
test("case sensitivity: registered table") {
316-
val testData: SchemaRDD =
318+
val testData =
317319
TestHive.sparkContext.parallelize(
318320
TestData(1, "str1") ::
319321
TestData(2, "str2") :: Nil)
@@ -467,7 +469,7 @@ class HiveQuerySuite extends HiveComparisonTest {
467469
}
468470

469471
// Describe a registered temporary table.
470-
val testData: SchemaRDD =
472+
val testData =
471473
TestHive.sparkContext.parallelize(
472474
TestData(1, "str1") ::
473475
TestData(1, "str2") :: Nil)
@@ -495,6 +497,23 @@ class HiveQuerySuite extends HiveComparisonTest {
495497
}
496498
}
497499

500+
test("ADD JAR command") {
501+
val testJar = TestHive.getHiveFile("data/files/TestSerDe.jar").getCanonicalPath
502+
sql("CREATE TABLE alter1(a INT, b INT)")
503+
intercept[Exception] {
504+
sql(
505+
"""ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe'
506+
|WITH serdeproperties('s1'='9')
507+
""".stripMargin)
508+
}
509+
sql(s"ADD JAR $testJar")
510+
sql(
511+
"""ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe'
512+
|WITH serdeproperties('s1'='9')
513+
""".stripMargin)
514+
sql("DROP TABLE alter1")
515+
}
516+
498517
test("parse HQL set commands") {
499518
// Adapted from its SQL counterpart.
500519
val testKey = "spark.sql.key.usedfortestonly"

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
3535
import org.apache.hadoop.yarn.ipc.YarnRPC
3636
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
3737

38-
import org.apache.spark.{SparkConf, Logging}
38+
import org.apache.spark.{SecurityManager, SparkConf, Logging}
3939

4040

4141
class ExecutorRunnable(
@@ -46,7 +46,8 @@ class ExecutorRunnable(
4646
slaveId: String,
4747
hostname: String,
4848
executorMemory: Int,
49-
executorCores: Int)
49+
executorCores: Int,
50+
securityMgr: SecurityManager)
5051
extends Runnable with ExecutorRunnableUtil with Logging {
5152

5253
var rpc: YarnRPC = YarnRPC.create(conf)
@@ -86,6 +87,8 @@ class ExecutorRunnable(
8687
logInfo("Setting up executor with commands: " + commands)
8788
ctx.setCommands(commands)
8889

90+
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
91+
8992
// Send the start request to the ContainerManager
9093
val startReq = Records.newRecord(classOf[StartContainerRequest])
9194
.asInstanceOf[StartContainerRequest]

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
2323
import scala.collection.JavaConversions._
2424
import scala.collection.mutable.{ArrayBuffer, HashMap}
2525

26-
import org.apache.spark.SparkConf
26+
import org.apache.spark.{SecurityManager, SparkConf}
2727
import org.apache.spark.scheduler.SplitInfo
2828

2929
import org.apache.hadoop.conf.Configuration
@@ -41,21 +41,23 @@ private[yarn] class YarnAllocationHandler(
4141
resourceManager: AMRMProtocol,
4242
appAttemptId: ApplicationAttemptId,
4343
args: ApplicationMasterArguments,
44-
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
45-
extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
44+
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
45+
securityMgr: SecurityManager)
46+
extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {
4647

4748
private val lastResponseId = new AtomicInteger()
4849
private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()
4950

5051
override protected def allocateContainers(count: Int): YarnAllocateResponse = {
5152
var resourceRequests: List[ResourceRequest] = null
5253

53-
// default.
54-
if (count <= 0 || preferredHostToCount.isEmpty) {
55-
logDebug("numExecutors: " + count + ", host preferences: " +
56-
preferredHostToCount.isEmpty)
57-
resourceRequests = List(createResourceRequest(
58-
AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
54+
logDebug("numExecutors: " + count)
55+
if (count <= 0) {
56+
resourceRequests = List()
57+
} else if (preferredHostToCount.isEmpty) {
58+
logDebug("host preferences is empty")
59+
resourceRequests = List(createResourceRequest(
60+
AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
5961
} else {
6062
// request for all hosts in preferred nodes and for numExecutors -
6163
// candidates.size, request by default allocation policy.

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
2727
import org.apache.hadoop.yarn.ipc.YarnRPC
2828
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
2929

30-
import org.apache.spark.{Logging, SparkConf}
30+
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3131
import org.apache.spark.scheduler.SplitInfo
3232
import org.apache.spark.util.Utils
3333

@@ -45,15 +45,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
4545
sparkConf: SparkConf,
4646
preferredNodeLocations: Map[String, Set[SplitInfo]],
4747
uiAddress: String,
48-
uiHistoryAddress: String) = {
48+
uiHistoryAddress: String,
49+
securityMgr: SecurityManager) = {
4950
this.rpc = YarnRPC.create(conf)
5051
this.uiHistoryAddress = uiHistoryAddress
5152

5253
resourceManager = registerWithResourceManager(conf)
5354
registerApplicationMaster(uiAddress)
5455

5556
new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
56-
preferredNodeLocations)
57+
preferredNodeLocations, securityMgr)
5758
}
5859

5960
override def getAttemptId() = {

0 commit comments

Comments
 (0)