-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-5785] [PySpark] narrow dependency for cogroup/join in PySpark #4629
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #27573 has started for PR 4629 at commit
|
} else { | ||
new UnionRDD(this, rdds) | ||
} | ||
} | ||
|
||
/** Build the union of a list of RDDs passed as variable-length arguments. */ | ||
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change this method to call the union
method that you modified so the change will take effect here, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Test build #27582 has started for PR 4629 at commit
|
Test build #27583 has started for PR 4629 at commit
|
Test build #27587 has started for PR 4629 at commit
|
Test build #27573 has finished for PR 4629 at commit
|
Test PASSed. |
Test build #27582 has finished for PR 4629 at commit
|
Test FAILed. |
Test build #610 has started for PR 4629 at commit
|
Test build #27583 has finished for PR 4629 at commit
|
Test PASSed. |
Test build #27587 has finished for PR 4629 at commit
|
Test PASSed. |
Test build #610 has finished for PR 4629 at commit
|
@@ -961,11 +961,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |||
} | |||
|
|||
/** Build the union of a list of RDDs. */ | |||
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds) | |||
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = { | |||
val partitioners = rdds.map(_.partitioner).toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If _.partitioner
is an option, then I think this can be simplified by using flatMap
instead of map
, since that would just let you check whether partitioners.size == 1
on the next line without having to have the isDefined
check as well.
LGTM overall; this is tricky logic, though, so I'll take one more pass through when I get home. |
Test build #27612 has started for PR 4629 at commit
|
Test build #27612 has finished for PR 4629 at commit
|
Test FAILed. |
Test build #611 has started for PR 4629 at commit
|
Test build #611 has finished for PR 4629 at commit
|
@@ -740,6 +739,27 @@ def test_multiple_python_java_RDD_conversions(self): | |||
converted_rdd = RDD(data_python_rdd, self.sc) | |||
self.assertEqual(2, converted_rdd.count()) | |||
|
|||
def test_narrow_dependency_in_join(self): | |||
rdd = self.sc.parallelize(range(10)).map(lambda x: (x, x)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do these tests actually check for a narrow dependency at all? I think they will pass even without it.
I'm not sure of a better suggestion, though. I had to use getNarrowDependencies
in another PR to check this:
https://github.com/apache/spark/pull/4449/files#diff-4bc3643ce90b54113cad7104f91a075bR582
but I don't think that is even exposed in pyspark ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is only for correctness, I will add more check for narrow dependency base one the Python progress API (#3027)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've merged #3027, so I think we can now test this by setting a job group, running a job, then querying the statusTracker to determine how many stages were actually run as part of that job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
Test build #27657 has started for PR 4629 at commit
|
Test build #27657 has finished for PR 4629 at commit
|
Test PASSed. |
Thanks for adding the test. LGTM, so I'm going to merge this into |
Currently, PySpark does not support narrow dependency during cogroup/join when the two RDDs have the partitioner, another unnecessary shuffle stage will come in. The Python implementation of cogroup/join is different than Scala one, it depends on union() and partitionBy(). This patch will try to use PartitionerAwareUnionRDD() in union(), when all the RDDs have the same partitioner. It also fix `reservePartitioner` in all the map() or mapPartitions(), then partitionBy() can skip the unnecessary shuffle stage. Author: Davies Liu <davies@databricks.com> Closes #4629 from davies/narrow and squashes the following commits: dffe34e [Davies Liu] improve test, check number of stages for join/cogroup 1ed3ba2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into narrow 4d29932 [Davies Liu] address comment cc28d97 [Davies Liu] add unit tests 940245e [Davies Liu] address comments ff5a0a6 [Davies Liu] skip the partitionBy() on Python side eb26c62 [Davies Liu] narrow dependency in PySpark (cherry picked from commit c3d2b90) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
Currently, PySpark does not support narrow dependency during cogroup/join when the two RDDs have the partitioner, another unnecessary shuffle stage will come in.
The Python implementation of cogroup/join is different than Scala one, it depends on union() and partitionBy(). This patch will try to use PartitionerAwareUnionRDD() in union(), when all the RDDs have the same partitioner. It also fix
reservePartitioner
in all the map() or mapPartitions(), then partitionBy() can skip the unnecessary shuffle stage.