Skip to content

Commit e8336ec

Browse files
committed
Merge pull request apache#66 from markhamstra/csd-1.4
SKIPME merging Apache branch-1.4 bug fixes
2 parents 98e8af7 + cb102d5 commit e8336ec

File tree

6 files changed

+159
-37
lines changed

6 files changed

+159
-37
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -352,8 +352,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
352352

353353
/**
354354
* Comparison function that defines the sort order for application attempts within the same
355-
* application. Order is: running attempts before complete attempts, running attempts sorted
356-
* by start time, completed attempts sorted by end time.
355+
* application. Order is: attempts are sorted by descending start time.
356+
* Most recent attempt state matches with current state of the app.
357357
*
358358
* Normally applications should have a single running attempt; but failure to call sc.stop()
359359
* may cause multiple running attempts to show up.
@@ -363,11 +363,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
363363
private def compareAttemptInfo(
364364
a1: FsApplicationAttemptInfo,
365365
a2: FsApplicationAttemptInfo): Boolean = {
366-
if (a1.completed == a2.completed) {
367-
if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
368-
} else {
369-
!a1.completed
370-
}
366+
a1.startTime >= a2.startTime
371367
}
372368

373369
/**

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
239239
appListAfterRename.size should be (1)
240240
}
241241

242-
test("apps with multiple attempts") {
242+
test("apps with multiple attempts with order") {
243243
val provider = new FsHistoryProvider(createTestConf())
244244

245-
val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
245+
val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
246246
writeFile(attempt1, true, None,
247-
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
248-
SparkListenerApplicationEnd(2L)
247+
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1"))
249248
)
250249

251250
updateAndCheck(provider) { list =>
@@ -255,7 +254,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
255254

256255
val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
257256
writeFile(attempt2, true, None,
258-
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
257+
SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2"))
259258
)
260259

261260
updateAndCheck(provider) { list =>
@@ -264,30 +263,29 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
264263
list.head.attempts.head.attemptId should be (Some("attempt2"))
265264
}
266265

267-
val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
268-
attempt2.delete()
269-
writeFile(attempt2, true, None,
270-
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
266+
val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false)
267+
writeFile(attempt3, true, None,
268+
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")),
271269
SparkListenerApplicationEnd(4L)
272270
)
273271

274272
updateAndCheck(provider) { list =>
275273
list should not be (null)
276274
list.size should be (1)
277-
list.head.attempts.size should be (2)
278-
list.head.attempts.head.attemptId should be (Some("attempt2"))
275+
list.head.attempts.size should be (3)
276+
list.head.attempts.head.attemptId should be (Some("attempt3"))
279277
}
280278

281279
val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
282-
writeFile(attempt2, true, None,
280+
writeFile(attempt1, true, None,
283281
SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
284282
SparkListenerApplicationEnd(6L)
285283
)
286284

287285
updateAndCheck(provider) { list =>
288286
list.size should be (2)
289287
list.head.attempts.size should be (1)
290-
list.last.attempts.size should be (2)
288+
list.last.attempts.size should be (3)
291289
list.head.attempts.head.attemptId should be (Some("attempt1"))
292290

293291
list.foreach { case app =>

graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,9 +332,9 @@ object GraphImpl {
332332
edgeStorageLevel: StorageLevel,
333333
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
334334
val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
335-
.withTargetStorageLevel(edgeStorageLevel).cache()
335+
.withTargetStorageLevel(edgeStorageLevel)
336336
val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
337-
.withTargetStorageLevel(vertexStorageLevel).cache()
337+
.withTargetStorageLevel(vertexStorageLevel)
338338
GraphImpl(vertexRDD, edgeRDD)
339339
}
340340

@@ -346,9 +346,14 @@ object GraphImpl {
346346
def apply[VD: ClassTag, ED: ClassTag](
347347
vertices: VertexRDD[VD],
348348
edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
349+
350+
vertices.cache()
351+
349352
// Convert the vertex partitions in edges to the correct type
350353
val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
351354
.mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
355+
.cache()
356+
352357
GraphImpl.fromExistingRDDs(vertices, newEdges)
353358
}
354359

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@
146146
<chill.version>0.5.0</chill.version>
147147
<ivy.version>2.4.0</ivy.version>
148148
<oro.version>2.0.8</oro.version>
149-
<codahale.metrics.version>3.1.0</codahale.metrics.version>
149+
<codahale.metrics.version>3.1.2</codahale.metrics.version>
150150
<avro.version>1.7.7</avro.version>
151151
<avro.mapred.classifier>hadoop2</avro.mapred.classifier>
152152
<jets3t.version>0.7.1</jets3t.version>

python/pyspark/rdd.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,9 @@ def func(iterator):
849849
for obj in iterator:
850850
acc = op(obj, acc)
851851
yield acc
852+
# collecting result of mapPartitions here ensures that the copy of
853+
# zeroValue provided to each partition is unique from the one provided
854+
# to the final reduce call
852855
vals = self.mapPartitions(func).collect()
853856
return reduce(op, vals, zeroValue)
854857

@@ -878,8 +881,11 @@ def func(iterator):
878881
for obj in iterator:
879882
acc = seqOp(acc, obj)
880883
yield acc
881-
882-
return self.mapPartitions(func).fold(zeroValue, combOp)
884+
# collecting result of mapPartitions here ensures that the copy of
885+
# zeroValue provided to each partition is unique from the one provided
886+
# to the final reduce call
887+
vals = self.mapPartitions(func).collect()
888+
return reduce(combOp, vals, zeroValue)
883889

884890
def treeAggregate(self, zeroValue, seqOp, combOp, depth=2):
885891
"""

python/pyspark/tests.py

Lines changed: 129 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -529,10 +529,127 @@ def test_deleting_input_files(self):
529529

530530
def test_sampling_default_seed(self):
531531
# Test for SPARK-3995 (default seed setting)
532-
data = self.sc.parallelize(range(1000), 1)
532+
data = self.sc.parallelize(xrange(1000), 1)
533533
subset = data.takeSample(False, 10)
534534
self.assertEqual(len(subset), 10)
535535

536+
def test_aggregate_mutable_zero_value(self):
537+
# Test for SPARK-9021; uses aggregate and treeAggregate to build dict
538+
# representing a counter of ints
539+
# NOTE: dict is used instead of collections.Counter for Python 2.6
540+
# compatibility
541+
from collections import defaultdict
542+
543+
# Show that single or multiple partitions work
544+
data1 = self.sc.range(10, numSlices=1)
545+
data2 = self.sc.range(10, numSlices=2)
546+
547+
def seqOp(x, y):
548+
x[y] += 1
549+
return x
550+
551+
def comboOp(x, y):
552+
for key, val in y.items():
553+
x[key] += val
554+
return x
555+
556+
counts1 = data1.aggregate(defaultdict(int), seqOp, comboOp)
557+
counts2 = data2.aggregate(defaultdict(int), seqOp, comboOp)
558+
counts3 = data1.treeAggregate(defaultdict(int), seqOp, comboOp, 2)
559+
counts4 = data2.treeAggregate(defaultdict(int), seqOp, comboOp, 2)
560+
561+
ground_truth = defaultdict(int, dict((i, 1) for i in range(10)))
562+
self.assertEqual(counts1, ground_truth)
563+
self.assertEqual(counts2, ground_truth)
564+
self.assertEqual(counts3, ground_truth)
565+
self.assertEqual(counts4, ground_truth)
566+
567+
def test_aggregate_by_key_mutable_zero_value(self):
568+
# Test for SPARK-9021; uses aggregateByKey to make a pair RDD that
569+
# contains lists of all values for each key in the original RDD
570+
571+
# list(range(...)) for Python 3.x compatibility (can't use * operator
572+
# on a range object)
573+
# list(zip(...)) for Python 3.x compatibility (want to parallelize a
574+
# collection, not a zip object)
575+
tuples = list(zip(list(range(10))*2, [1]*20))
576+
# Show that single or multiple partitions work
577+
data1 = self.sc.parallelize(tuples, 1)
578+
data2 = self.sc.parallelize(tuples, 2)
579+
580+
def seqOp(x, y):
581+
x.append(y)
582+
return x
583+
584+
def comboOp(x, y):
585+
x.extend(y)
586+
return x
587+
588+
values1 = data1.aggregateByKey([], seqOp, comboOp).collect()
589+
values2 = data2.aggregateByKey([], seqOp, comboOp).collect()
590+
# Sort lists to ensure clean comparison with ground_truth
591+
values1.sort()
592+
values2.sort()
593+
594+
ground_truth = [(i, [1]*2) for i in range(10)]
595+
self.assertEqual(values1, ground_truth)
596+
self.assertEqual(values2, ground_truth)
597+
598+
def test_fold_mutable_zero_value(self):
599+
# Test for SPARK-9021; uses fold to merge an RDD of dict counters into
600+
# a single dict
601+
# NOTE: dict is used instead of collections.Counter for Python 2.6
602+
# compatibility
603+
from collections import defaultdict
604+
605+
counts1 = defaultdict(int, dict((i, 1) for i in range(10)))
606+
counts2 = defaultdict(int, dict((i, 1) for i in range(3, 8)))
607+
counts3 = defaultdict(int, dict((i, 1) for i in range(4, 7)))
608+
counts4 = defaultdict(int, dict((i, 1) for i in range(5, 6)))
609+
all_counts = [counts1, counts2, counts3, counts4]
610+
# Show that single or multiple partitions work
611+
data1 = self.sc.parallelize(all_counts, 1)
612+
data2 = self.sc.parallelize(all_counts, 2)
613+
614+
def comboOp(x, y):
615+
for key, val in y.items():
616+
x[key] += val
617+
return x
618+
619+
fold1 = data1.fold(defaultdict(int), comboOp)
620+
fold2 = data2.fold(defaultdict(int), comboOp)
621+
622+
ground_truth = defaultdict(int)
623+
for counts in all_counts:
624+
for key, val in counts.items():
625+
ground_truth[key] += val
626+
self.assertEqual(fold1, ground_truth)
627+
self.assertEqual(fold2, ground_truth)
628+
629+
def test_fold_by_key_mutable_zero_value(self):
630+
# Test for SPARK-9021; uses foldByKey to make a pair RDD that contains
631+
# lists of all values for each key in the original RDD
632+
633+
tuples = [(i, range(i)) for i in range(10)]*2
634+
# Show that single or multiple partitions work
635+
data1 = self.sc.parallelize(tuples, 1)
636+
data2 = self.sc.parallelize(tuples, 2)
637+
638+
def comboOp(x, y):
639+
x.extend(y)
640+
return x
641+
642+
values1 = data1.foldByKey([], comboOp).collect()
643+
values2 = data2.foldByKey([], comboOp).collect()
644+
# Sort lists to ensure clean comparison with ground_truth
645+
values1.sort()
646+
values2.sort()
647+
648+
# list(range(...)) for Python 3.x compatibility
649+
ground_truth = [(i, list(range(i))*2) for i in range(10)]
650+
self.assertEqual(values1, ground_truth)
651+
self.assertEqual(values2, ground_truth)
652+
536653
def test_aggregate_by_key(self):
537654
data = self.sc.parallelize([(1, 1), (1, 1), (3, 2), (5, 1), (5, 3)], 2)
538655

@@ -624,8 +741,8 @@ def test_zip_with_different_serializers(self):
624741

625742
def test_zip_with_different_object_sizes(self):
626743
# regress test for SPARK-5973
627-
a = self.sc.parallelize(range(10000)).map(lambda i: '*' * i)
628-
b = self.sc.parallelize(range(10000, 20000)).map(lambda i: '*' * i)
744+
a = self.sc.parallelize(xrange(10000)).map(lambda i: '*' * i)
745+
b = self.sc.parallelize(xrange(10000, 20000)).map(lambda i: '*' * i)
629746
self.assertEqual(10000, a.zip(b).count())
630747

631748
def test_zip_with_different_number_of_items(self):
@@ -647,7 +764,7 @@ def test_zip_with_different_number_of_items(self):
647764
self.assertRaises(Exception, lambda: a.zip(b).count())
648765

649766
def test_count_approx_distinct(self):
650-
rdd = self.sc.parallelize(range(1000))
767+
rdd = self.sc.parallelize(xrange(1000))
651768
self.assertTrue(950 < rdd.countApproxDistinct(0.03) < 1050)
652769
self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.03) < 1050)
653770
self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.03) < 1050)
@@ -777,7 +894,7 @@ def test_distinct(self):
777894
def test_external_group_by_key(self):
778895
self.sc._conf.set("spark.python.worker.memory", "1m")
779896
N = 200001
780-
kv = self.sc.parallelize(range(N)).map(lambda x: (x % 3, x))
897+
kv = self.sc.parallelize(xrange(N)).map(lambda x: (x % 3, x))
781898
gkv = kv.groupByKey().cache()
782899
self.assertEqual(3, gkv.count())
783900
filtered = gkv.filter(lambda kv: kv[0] == 1)
@@ -871,7 +988,7 @@ def test_narrow_dependency_in_join(self):
871988

872989
# Regression test for SPARK-6294
873990
def test_take_on_jrdd(self):
874-
rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
991+
rdd = self.sc.parallelize(xrange(1 << 20)).map(lambda x: str(x))
875992
rdd._jrdd.first()
876993

877994
def test_sortByKey_uses_all_partitions_not_only_first_and_last(self):
@@ -1503,13 +1620,13 @@ def run():
15031620
self.fail("daemon had been killed")
15041621

15051622
# run a normal job
1506-
rdd = self.sc.parallelize(range(100), 1)
1623+
rdd = self.sc.parallelize(xrange(100), 1)
15071624
self.assertEqual(100, rdd.map(str).count())
15081625

15091626
def test_after_exception(self):
15101627
def raise_exception(_):
15111628
raise Exception()
1512-
rdd = self.sc.parallelize(range(100), 1)
1629+
rdd = self.sc.parallelize(xrange(100), 1)
15131630
with QuietTest(self.sc):
15141631
self.assertRaises(Exception, lambda: rdd.foreach(raise_exception))
15151632
self.assertEqual(100, rdd.map(str).count())
@@ -1525,22 +1642,22 @@ def test_after_jvm_exception(self):
15251642
with QuietTest(self.sc):
15261643
self.assertRaises(Exception, lambda: filtered_data.count())
15271644

1528-
rdd = self.sc.parallelize(range(100), 1)
1645+
rdd = self.sc.parallelize(xrange(100), 1)
15291646
self.assertEqual(100, rdd.map(str).count())
15301647

15311648
def test_accumulator_when_reuse_worker(self):
15321649
from pyspark.accumulators import INT_ACCUMULATOR_PARAM
15331650
acc1 = self.sc.accumulator(0, INT_ACCUMULATOR_PARAM)
1534-
self.sc.parallelize(range(100), 20).foreach(lambda x: acc1.add(x))
1651+
self.sc.parallelize(xrange(100), 20).foreach(lambda x: acc1.add(x))
15351652
self.assertEqual(sum(range(100)), acc1.value)
15361653

15371654
acc2 = self.sc.accumulator(0, INT_ACCUMULATOR_PARAM)
1538-
self.sc.parallelize(range(100), 20).foreach(lambda x: acc2.add(x))
1655+
self.sc.parallelize(xrange(100), 20).foreach(lambda x: acc2.add(x))
15391656
self.assertEqual(sum(range(100)), acc2.value)
15401657
self.assertEqual(sum(range(100)), acc1.value)
15411658

15421659
def test_reuse_worker_after_take(self):
1543-
rdd = self.sc.parallelize(range(100000), 1)
1660+
rdd = self.sc.parallelize(xrange(100000), 1)
15441661
self.assertEqual(0, rdd.first())
15451662

15461663
def count():

0 commit comments

Comments
 (0)