-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-5785] [PySpark] narrow dependency for cogroup/join in PySpark #4629
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
eb26c62
ff5a0a6
940245e
cc28d97
4d29932
1ed3ba2
dffe34e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -303,6 +303,7 @@ private class PythonException(msg: String, cause: Exception) extends RuntimeExce | |
private class PairwiseRDD(prev: RDD[Array[Byte]]) extends | ||
RDD[(Long, Array[Byte])](prev) { | ||
override def getPartitions = prev.partitions | ||
override val partitioner = prev.partitioner | ||
override def compute(split: Partition, context: TaskContext) = | ||
prev.iterator(split, context).grouped(2).map { | ||
case Seq(a, b) => (Utils.deserializeLongValue(a), b) | ||
|
@@ -329,6 +330,15 @@ private[spark] object PythonRDD extends Logging { | |
} | ||
} | ||
|
||
/** | ||
* Return an RDD of values from an RDD of (Long, Array[Byte]), with preservePartitions=true | ||
* | ||
* This is useful for PySpark to have the partitioner after partitionBy() | ||
*/ | ||
def valueOfPair(pair: JavaPairRDD[Long, Array[Byte]]): JavaRDD[Array[Byte]] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Scala/Java API, RDD.values() will change the RDD from (K, V) into RDD of V, so For PySpark, it change the RDD from (hash, [(K, V)]) to (K, V), |
||
pair.rdd.mapPartitions(it => it.map(_._2), true) | ||
} | ||
|
||
/** | ||
* Adapter for calling SparkContext#runJob from Python. | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -727,7 +727,6 @@ def test_multiple_python_java_RDD_conversions(self): | |
(u'1', {u'director': u'David Lean'}), | ||
(u'2', {u'director': u'Andrew Dominik'}) | ||
] | ||
from pyspark.rdd import RDD | ||
data_rdd = self.sc.parallelize(data) | ||
data_java_rdd = data_rdd._to_java_object_rdd() | ||
data_python_rdd = self.sc._jvm.SerDe.javaToPython(data_java_rdd) | ||
|
@@ -740,6 +739,43 @@ def test_multiple_python_java_RDD_conversions(self): | |
converted_rdd = RDD(data_python_rdd, self.sc) | ||
self.assertEqual(2, converted_rdd.count()) | ||
|
||
def test_narrow_dependency_in_join(self): | ||
rdd = self.sc.parallelize(range(10)).map(lambda x: (x, x)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do these tests actually check for a narrow dependency at all? I think they will pass even without it. I'm not sure of a better suggestion, though. I had to use but I don't think that is even exposed in pyspark ... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is only for correctness, I will add more check for narrow dependency base one the Python progress API (#3027) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've merged #3027, so I think we can now test this by setting a job group, running a job, then querying the statusTracker to determine how many stages were actually run as part of that job. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice! |
||
parted = rdd.partitionBy(2) | ||
self.assertEqual(2, parted.union(parted).getNumPartitions()) | ||
self.assertEqual(rdd.getNumPartitions() + 2, parted.union(rdd).getNumPartitions()) | ||
self.assertEqual(rdd.getNumPartitions() + 2, rdd.union(parted).getNumPartitions()) | ||
|
||
self.sc.setJobGroup("test1", "test", True) | ||
tracker = self.sc.statusTracker() | ||
|
||
d = sorted(parted.join(parted).collect()) | ||
self.assertEqual(10, len(d)) | ||
self.assertEqual((0, (0, 0)), d[0]) | ||
jobId = tracker.getJobIdsForGroup("test1")[0] | ||
self.assertEqual(2, len(tracker.getJobInfo(jobId).stageIds)) | ||
|
||
self.sc.setJobGroup("test2", "test", True) | ||
d = sorted(parted.join(rdd).collect()) | ||
self.assertEqual(10, len(d)) | ||
self.assertEqual((0, (0, 0)), d[0]) | ||
jobId = tracker.getJobIdsForGroup("test2")[0] | ||
self.assertEqual(3, len(tracker.getJobInfo(jobId).stageIds)) | ||
|
||
self.sc.setJobGroup("test3", "test", True) | ||
d = sorted(parted.cogroup(parted).collect()) | ||
self.assertEqual(10, len(d)) | ||
self.assertEqual([[0], [0]], map(list, d[0][1])) | ||
jobId = tracker.getJobIdsForGroup("test3")[0] | ||
self.assertEqual(2, len(tracker.getJobInfo(jobId).stageIds)) | ||
|
||
self.sc.setJobGroup("test4", "test", True) | ||
d = sorted(parted.cogroup(rdd).collect()) | ||
self.assertEqual(10, len(d)) | ||
self.assertEqual([[0], [0]], map(list, d[0][1])) | ||
jobId = tracker.getJobIdsForGroup("test4")[0] | ||
self.assertEqual(3, len(tracker.getJobInfo(jobId).stageIds)) | ||
|
||
|
||
class ProfilerTests(PySparkTestCase): | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change this method to call the
union
method that you modified so the change will take effect here, too?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed