Skip to content

Commit 7e4ad04

Browse files
committed
Merge branch 'master' into metrics
2 parents fbe9029 + f0c87dc commit 7e4ad04

File tree

33 files changed

+250
-70
lines changed

33 files changed

+250
-70
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkD
4949
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
5050
import org.apache.spark.scheduler.local.LocalBackend
5151
import org.apache.spark.storage._
52-
import org.apache.spark.SPARK_VERSION
5352
import org.apache.spark.ui.SparkUI
5453
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
5554

core/src/main/scala/org/apache/spark/util/Utils.scala

+18-1
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ package org.apache.spark.util
2020
import java.io._
2121
import java.net._
2222
import java.nio.ByteBuffer
23-
import java.util.{Locale, Random, UUID}
23+
import java.util.{Properties, Locale, Random, UUID}
2424
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
2525

26+
import org.apache.log4j.PropertyConfigurator
27+
2628
import scala.collection.JavaConversions._
2729
import scala.collection.Map
2830
import scala.collection.mutable.ArrayBuffer
@@ -834,6 +836,7 @@ private[spark] object Utils extends Logging {
834836
val exitCode = process.waitFor()
835837
stdoutThread.join() // Wait for it to finish reading output
836838
if (exitCode != 0) {
839+
logError(s"Process $command exited with code $exitCode: ${output}")
837840
throw new SparkException("Process " + command + " exited with code " + exitCode)
838841
}
839842
output.toString
@@ -1444,6 +1447,20 @@ private[spark] object Utils extends Logging {
14441447
}
14451448
}
14461449

1450+
/**
1451+
* config a log4j properties used for testsuite
1452+
*/
1453+
def configTestLog4j(level: String): Unit = {
1454+
val pro = new Properties()
1455+
pro.put("log4j.rootLogger", s"$level, console")
1456+
pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender")
1457+
pro.put("log4j.appender.console.target", "System.err")
1458+
pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout")
1459+
pro.put("log4j.appender.console.layout.ConversionPattern",
1460+
"%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
1461+
PropertyConfigurator.configure(pro)
1462+
}
1463+
14471464
}
14481465

14491466
/**

core/src/test/scala/org/apache/spark/DriverSuite.scala

+1-4
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@ package org.apache.spark
1919

2020
import java.io.File
2121

22-
import org.apache.log4j.Logger
23-
import org.apache.log4j.Level
24-
2522
import org.scalatest.FunSuite
2623
import org.scalatest.concurrent.Timeouts
2724
import org.scalatest.prop.TableDrivenPropertyChecks._
@@ -54,7 +51,7 @@ class DriverSuite extends FunSuite with Timeouts {
5451
*/
5552
object DriverWithoutCleanup {
5653
def main(args: Array[String]) {
57-
Logger.getRootLogger().setLevel(Level.WARN)
54+
Utils.configTestLog4j("INFO")
5855
val sc = new SparkContext(args(0), "DriverWithoutCleanup")
5956
sc.parallelize(1 to 100, 4).count()
6057
}

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

+2
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
317317

318318
object JarCreationTest {
319319
def main(args: Array[String]) {
320+
Utils.configTestLog4j("INFO")
320321
val conf = new SparkConf()
321322
val sc = new SparkContext(conf)
322323
val result = sc.makeRDD(1 to 100, 10).mapPartitions { x =>
@@ -338,6 +339,7 @@ object JarCreationTest {
338339

339340
object SimpleApplicationTest {
340341
def main(args: Array[String]) {
342+
Utils.configTestLog4j("INFO")
341343
val conf = new SparkConf()
342344
val sc = new SparkContext(conf)
343345
val configs = Seq("spark.master", "spark.app.name")

examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.examples.graphx
2020
import org.apache.spark.SparkContext._
2121
import org.apache.spark._
2222
import org.apache.spark.graphx._
23-
import org.apache.spark.examples.graphx.Analytics
23+
2424

2525
/**
2626
* Uses GraphX to run PageRank on a LiveJournal social network graph. Download the dataset from

python/pyspark/context.py

+14
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,20 @@ def _ensure_initialized(cls, instance=None, gateway=None):
232232
else:
233233
SparkContext._active_spark_context = instance
234234

235+
def __enter__(self):
236+
"""
237+
Enable 'with SparkContext(...) as sc: app(sc)' syntax.
238+
"""
239+
return self
240+
241+
def __exit__(self, type, value, trace):
242+
"""
243+
Enable 'with SparkContext(...) as sc: app' syntax.
244+
245+
Specifically stop the context on exit of the with block.
246+
"""
247+
self.stop()
248+
235249
@classmethod
236250
def setSystemProperty(cls, key, value):
237251
"""

python/pyspark/tests.py

+29
Original file line numberDiff line numberDiff line change
@@ -1255,6 +1255,35 @@ def test_single_script_on_cluster(self):
12551255
self.assertIn("[2, 4, 6]", out)
12561256

12571257

1258+
class ContextStopTests(unittest.TestCase):
1259+
1260+
def test_stop(self):
1261+
sc = SparkContext()
1262+
self.assertNotEqual(SparkContext._active_spark_context, None)
1263+
sc.stop()
1264+
self.assertEqual(SparkContext._active_spark_context, None)
1265+
1266+
def test_with(self):
1267+
with SparkContext() as sc:
1268+
self.assertNotEqual(SparkContext._active_spark_context, None)
1269+
self.assertEqual(SparkContext._active_spark_context, None)
1270+
1271+
def test_with_exception(self):
1272+
try:
1273+
with SparkContext() as sc:
1274+
self.assertNotEqual(SparkContext._active_spark_context, None)
1275+
raise Exception()
1276+
except:
1277+
pass
1278+
self.assertEqual(SparkContext._active_spark_context, None)
1279+
1280+
def test_with_stop(self):
1281+
with SparkContext() as sc:
1282+
self.assertNotEqual(SparkContext._active_spark_context, None)
1283+
sc.stop()
1284+
self.assertEqual(SparkContext._active_spark_context, None)
1285+
1286+
12581287
@unittest.skipIf(not _have_scipy, "SciPy not installed")
12591288
class SciPyTests(PySparkTestCase):
12601289

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

100644100755
+4
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
8282
protected val DISTINCT = Keyword("DISTINCT")
8383
protected val FALSE = Keyword("FALSE")
8484
protected val FIRST = Keyword("FIRST")
85+
protected val LAST = Keyword("LAST")
8586
protected val FROM = Keyword("FROM")
8687
protected val FULL = Keyword("FULL")
8788
protected val GROUP = Keyword("GROUP")
@@ -125,6 +126,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
125126
protected val SUBSTR = Keyword("SUBSTR")
126127
protected val SUBSTRING = Keyword("SUBSTRING")
127128
protected val SQRT = Keyword("SQRT")
129+
protected val ABS = Keyword("ABS")
128130

129131
// Use reflection to find the reserved words defined in this class.
130132
protected val reservedWords =
@@ -315,6 +317,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
315317
case s ~ _ ~ _ ~ _ ~ _ ~ e => ApproxCountDistinct(e, s.toDouble)
316318
} |
317319
FIRST ~> "(" ~> expression <~ ")" ^^ { case exp => First(exp) } |
320+
LAST ~> "(" ~> expression <~ ")" ^^ { case exp => Last(exp) } |
318321
AVG ~> "(" ~> expression <~ ")" ^^ { case exp => Average(exp) } |
319322
MIN ~> "(" ~> expression <~ ")" ^^ { case exp => Min(exp) } |
320323
MAX ~> "(" ~> expression <~ ")" ^^ { case exp => Max(exp) } |
@@ -330,6 +333,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
330333
case s ~ "," ~ p ~ "," ~ l => Substring(s,p,l)
331334
} |
332335
SQRT ~> "(" ~> expression <~ ")" ^^ { case exp => Sqrt(exp) } |
336+
ABS ~> "(" ~> expression <~ ")" ^^ { case exp => Abs(exp) } |
333337
ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ {
334338
case udfName ~ _ ~ exprs => UnresolvedFunction(udfName, exprs)
335339
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala

+21-17
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,22 @@ object HiveTypeCoercion {
2626
// See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
2727
// The conversion for integral and floating point types have a linear widening hierarchy:
2828
val numericPrecedence =
29-
Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
30-
// Boolean is only wider than Void
31-
val booleanPrecedence = Seq(NullType, BooleanType)
32-
val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil
29+
Seq(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
30+
val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: Nil
31+
32+
def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = {
33+
val valueTypes = Seq(t1, t2).filter(t => t != NullType)
34+
if (valueTypes.distinct.size > 1) {
35+
// Try and find a promotion rule that contains both types in question.
36+
val applicableConversion =
37+
HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2))
38+
39+
// If found return the widest common type, otherwise None
40+
applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
41+
} else {
42+
Some(if (valueTypes.size == 0) NullType else valueTypes.head)
43+
}
44+
}
3345
}
3446

3547
/**
@@ -53,17 +65,6 @@ trait HiveTypeCoercion {
5365
Division ::
5466
Nil
5567

56-
trait TypeWidening {
57-
def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = {
58-
// Try and find a promotion rule that contains both types in question.
59-
val applicableConversion =
60-
HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2))
61-
62-
// If found return the widest common type, otherwise None
63-
applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
64-
}
65-
}
66-
6768
/**
6869
* Applies any changes to [[AttributeReference]] data types that are made by other rules to
6970
* instances higher in the query tree.
@@ -144,7 +145,8 @@ trait HiveTypeCoercion {
144145
* - LongType to FloatType
145146
* - LongType to DoubleType
146147
*/
147-
object WidenTypes extends Rule[LogicalPlan] with TypeWidening {
148+
object WidenTypes extends Rule[LogicalPlan] {
149+
import HiveTypeCoercion._
148150

149151
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
150152
case u @ Union(left, right) if u.childrenResolved && !u.resolved =>
@@ -352,7 +354,9 @@ trait HiveTypeCoercion {
352354
/**
353355
* Coerces the type of different branches of a CASE WHEN statement to a common type.
354356
*/
355-
object CaseWhenCoercion extends Rule[LogicalPlan] with TypeWidening {
357+
object CaseWhenCoercion extends Rule[LogicalPlan] {
358+
import HiveTypeCoercion._
359+
356360
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
357361
case cw @ CaseWhen(branches) if !cw.resolved && !branches.exists(!_.resolved) =>
358362
val valueTypes = branches.sliding(2, 2).map {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

100644100755
+1
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ package object dsl {
132132
def approxCountDistinct(e: Expression, rsd: Double = 0.05) = ApproxCountDistinct(e, rsd)
133133
def avg(e: Expression) = Average(e)
134134
def first(e: Expression) = First(e)
135+
def last(e: Expression) = Last(e)
135136
def min(e: Expression) = Min(e)
136137
def max(e: Expression) = Max(e)
137138
def upper(e: Expression) = Upper(e)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,9 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR
227227
new SpecificMutableRow(newValues)
228228
}
229229

230-
override def update(ordinal: Int, value: Any): Unit = values(ordinal).update(value)
230+
override def update(ordinal: Int, value: Any): Unit = {
231+
if (value == null) setNullAt(ordinal) else values(ordinal).update(value)
232+
}
231233

232234
override def iterator: Iterator[Any] = values.map(_.boxed).iterator
233235

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala

100644100755
+28
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,21 @@ case class First(child: Expression) extends PartialAggregate with trees.UnaryNod
344344
override def newInstance() = new FirstFunction(child, this)
345345
}
346346

347+
case class Last(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
348+
override def references = child.references
349+
override def nullable = true
350+
override def dataType = child.dataType
351+
override def toString = s"LAST($child)"
352+
353+
override def asPartial: SplitEvaluation = {
354+
val partialLast = Alias(Last(child), "PartialLast")()
355+
SplitEvaluation(
356+
Last(partialLast.toAttribute),
357+
partialLast :: Nil)
358+
}
359+
override def newInstance() = new LastFunction(child, this)
360+
}
361+
347362
case class AverageFunction(expr: Expression, base: AggregateExpression)
348363
extends AggregateFunction {
349364

@@ -489,3 +504,16 @@ case class FirstFunction(expr: Expression, base: AggregateExpression) extends Ag
489504

490505
override def eval(input: Row): Any = result
491506
}
507+
508+
case class LastFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
509+
def this() = this(null, null) // Required for serialization.
510+
511+
var result: Any = null
512+
513+
override def update(input: Row): Unit = {
514+
result = input
515+
}
516+
517+
override def eval(input: Row): Any = if (result != null) expr.eval(result.asInstanceOf[Row])
518+
else null
519+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala

+15
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
1919

2020
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
2121
import org.apache.spark.sql.catalyst.types._
22+
import scala.math.pow
2223

2324
case class UnaryMinus(child: Expression) extends UnaryExpression {
2425
type EvaluatedType = Any
@@ -129,3 +130,17 @@ case class MaxOf(left: Expression, right: Expression) extends Expression {
129130

130131
override def toString = s"MaxOf($left, $right)"
131132
}
133+
134+
/**
135+
* A function that get the absolute value of the numeric value.
136+
*/
137+
case class Abs(child: Expression) extends UnaryExpression {
138+
type EvaluatedType = Any
139+
140+
def dataType = child.dataType
141+
override def foldable = child.foldable
142+
def nullable = child.nullable
143+
override def toString = s"Abs($child)"
144+
145+
override def eval(input: Row): Any = n1(child, input, _.abs(_))
146+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -265,12 +265,13 @@ case class CaseWhen(branches: Seq[Expression]) extends Expression {
265265
false
266266
} else {
267267
val allCondBooleans = predicates.forall(_.dataType == BooleanType)
268-
val dataTypesEqual = values.map(_.dataType).distinct.size <= 1
268+
// both then and else val should be considered.
269+
val dataTypesEqual = (values ++ elseValue).map(_.dataType).distinct.size <= 1
269270
allCondBooleans && dataTypesEqual
270271
}
271272
}
272273

273-
/** Written in imperative fashion for performance considerations. Same for CaseKeyWhen. */
274+
/** Written in imperative fashion for performance considerations. */
274275
override def eval(input: Row): Any = {
275276
val len = branchesArr.length
276277
var i = 0

0 commit comments

Comments
 (0)