Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add unit tests
  • Loading branch information
Davies Liu committed Feb 16, 2015
commit cc28d97cc5c629102333ac9a91a7d323583cd4e6
22 changes: 21 additions & 1 deletion python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,6 @@ def test_multiple_python_java_RDD_conversions(self):
(u'1', {u'director': u'David Lean'}),
(u'2', {u'director': u'Andrew Dominik'})
]
from pyspark.rdd import RDD
data_rdd = self.sc.parallelize(data)
data_java_rdd = data_rdd._to_java_object_rdd()
data_python_rdd = self.sc._jvm.SerDe.javaToPython(data_java_rdd)
Expand All @@ -740,6 +739,27 @@ def test_multiple_python_java_RDD_conversions(self):
converted_rdd = RDD(data_python_rdd, self.sc)
self.assertEqual(2, converted_rdd.count())

def test_narrow_dependency_in_join(self):
rdd = self.sc.parallelize(range(10)).map(lambda x: (x, x))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these tests actually check for a narrow dependency at all? I think they will pass even without it.

I'm not sure of a better suggestion, though. I had to use getNarrowDependencies in another PR to check this:
https://github.com/apache/spark/pull/4449/files#diff-4bc3643ce90b54113cad7104f91a075bR582

but I don't think that is even exposed in pyspark ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is only for correctness, I will add more check for narrow dependency base one the Python progress API (#3027)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've merged #3027, so I think we can now test this by setting a job group, running a job, then querying the statusTracker to determine how many stages were actually run as part of that job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

parted = rdd.partitionBy(2)
self.assertEqual(2, parted.union(parted).getNumPartitions())
self.assertEqual(rdd.getNumPartitions() + 2, parted.union(rdd).getNumPartitions())
self.assertEqual(rdd.getNumPartitions() + 2, rdd.union(parted).getNumPartitions())

d = sorted(parted.join(parted).collect())
self.assertEqual(10, len(d))
self.assertEqual((0, (0, 0)), d[0])
d = sorted(parted.join(rdd).collect())
self.assertEqual(10, len(d))
self.assertEqual((0, (0, 0)), d[0])

d = sorted(parted.cogroup(parted).collect())
self.assertEqual(10, len(d))
self.assertEqual([[0], [0]], map(list, d[0][1]))
d = sorted(parted.cogroup(rdd).collect())
self.assertEqual(10, len(d))
self.assertEqual([[0], [0]], map(list, d[0][1]))


class ProfilerTests(PySparkTestCase):

Expand Down